You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/01/04 17:24:44 UTC
[hive] branch master updated: HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ca8183fafc0 HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)
ca8183fafc0 is described below
commit ca8183fafc0116d5a79ce81eb0b346d5f1f08d58
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Wed Jan 4 09:24:31 2023 -0800
HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)
Closes #3887
---
.../java/org/apache/hadoop/hive/ql/Compiler.java | 1 -
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 13 ++-
.../org/apache/hadoop/hive/ql/DriverContext.java | 9 --
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 109 +++++++++++++++++++++
4 files changed, 117 insertions(+), 15 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
index 0a0f7df21ab..b6c955485fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -191,7 +191,6 @@ public class Compiler {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
- driverContext.setBackupContext(new Context(context));
boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d494f409ad7..d52c661872c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -279,10 +279,6 @@ public class Driver implements IDriver {
driverContext.getTxnManager().clearCaches();
}
driverContext.setRetrial(true);
- driverContext.getBackupContext().addSubContext(context);
- driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
- context = driverContext.getBackupContext();
-
driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
driverContext.getTxnManager().getValidTxns().toString());
@@ -540,7 +536,14 @@ public class Driver implements IDriver {
String originalCboInfo = context != null ? context.cboInfo : null;
if (context != null && context.getExplainAnalyze() != AnalyzeState.RUNNING) {
// close the existing ctx etc before compiling a new query, but does not destroy driver
- closeInProcess(false);
+ if (!driverContext.isRetrial()) {
+ closeInProcess(false);
+ } else {
+ // On retrial we need to maintain information from the prior context. Such
+ // as the currently held locks.
+ context = new Context(context);
+ releaseResources();
+ }
}
if (context == null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 60f08632d87..f9b1e627a09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -72,7 +72,6 @@ public class DriverContext {
private long compactorTxnId = 0;
private long analyzeTableWriteId = 0;
- private Context backupContext = null;
private boolean retrial = false;
private DataInput resStream;
@@ -225,14 +224,6 @@ public class DriverContext {
this.analyzeTableWriteId = analyzeTableWriteId;
}
- public Context getBackupContext() {
- return backupContext;
- }
-
- public void setBackupContext(Context backupContext) {
- this.backupContext = backupContext;
- }
-
public boolean isRetrial() {
return retrial;
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index ff84c44de2d..82eef64125b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
@@ -2465,6 +2466,114 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
Assert.assertTrue("Lost Update", isEqualCollection(res, asList("earl\t10", "amy\t10")));
}
+ // The intent of this test is to cause multiple conflicts to the same query to test the conflict retry functionality.
+ @Test
+ public void testConcurrentConflictRetry() throws Exception {
+ dropTable(new String[]{"target"});
+
+ driver2 = Mockito.spy(driver2);
+ driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+ driver.run("create table target(i int) stored as orc tblproperties ('transactional'='true')");
+ driver.run("insert into target values (1),(1)");
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+
+ // This partial mock allows us to execute a transaction that conflicts with the driver2 query in a controlled
+ // manner.
+ AtomicInteger lockAndRespondCount = new AtomicInteger();
+ Mockito.doAnswer((invocation) -> {
+ lockAndRespondCount.getAndIncrement();
+ // we want to make sure this transaction gets conflicted at least twice, to exercise the conflict retry loop
+ if (lockAndRespondCount.get() <= 2) {
+ swapTxnManager(txnMgr);
+ try {
+ // this should call a conflict with the current query being ran by driver2
+ driver.run("update target set i = 1 where i = 1");
+ } catch (Exception e) {
+ // do nothing
+ }
+ swapTxnManager(txnMgr2);
+ }
+ invocation.callRealMethod();
+ return null;
+ }).when(driver2).lockAndRespond();
+
+ driver2.run("update target set i = 1 where i = 1");
+
+ // we expected lockAndRespond to be called 3 times.
+ // 1 time after compilation, 2 more times due to the 2 conflicts
+ Assert.assertEquals(3, lockAndRespondCount.get());
+ swapTxnManager(txnMgr);
+ // we expect two rows
+ driver.run("select * from target");
+ List<String> res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals(2, res.size());
+ }
+
+ @Test
+ public void testConcurrentConflictMaxRetryCount() throws Exception {
+ dropTable(new String[]{"target"});
+ driver2 = Mockito.spy(driver2);
+ driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+
+ final int maxRetries = 4;
+ driver2.getConf().setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT, maxRetries);
+
+ driver.run("create table target(i int) stored as orc tblproperties ('transactional'='true')");
+ driver.run("insert into target values (1),(1)");
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+
+ // This run should conflict with the above query and cause the "conflict lambda" to be execute,
+ // which will then also conflict with the driver2 query and cause it to retry. The intent here is
+ // to cause driver2's query to exceed HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT and throw exception.
+ AtomicInteger lockAndRespondCount = new AtomicInteger();
+ Mockito.doAnswer((invocation) -> {
+ lockAndRespondCount.getAndIncrement();
+ // we want to make sure the transaction gets conflicted until it fails.
+ // +1 is for the initial lockAndRespond after compilation
+ if (lockAndRespondCount.get() <= 1 + maxRetries) {
+ swapTxnManager(txnMgr);
+ try {
+ // this should call a conflict with the current query being ran by driver2
+ driver.run("update target set i = 1 where i = 1");
+ } catch (Exception e) {
+ // do nothing
+ }
+ swapTxnManager(txnMgr2);
+ }
+ invocation.callRealMethod();
+ return null;
+ }).when(driver2).lockAndRespond();
+
+ boolean exceptionThrown = false;
+ // Start a query on driver2, we expect this query to never execute because the nature of the test it to conflict
+ // until HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT is exceeded.
+ // We verify that it is never executed by counting the number of rows returned that have i = 1.
+ try {
+ driver2.run("update target set i = 2 where i = 1");
+ } catch (CommandProcessorException cpe) {
+ exceptionThrown = true;
+ Assert.assertTrue(
+ cpe.getMessage().contains("Operation could not be executed, snapshot was outdated when locks were acquired.")
+ );
+ }
+ Assert.assertTrue(exceptionThrown);
+ // +1 for the inital lockAndRespond after compilation, another +1 for the lockAndRespond that caused us
+ // to exceed max retries.
+ Assert.assertEquals(maxRetries+2, lockAndRespondCount.get());
+ swapTxnManager(txnMgr);
+
+ // we expect two rows
+ driver.run("select * from target where i = 1");
+ List<String> res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals(2, res.size());
+ }
+
@Test
public void testMergeMultipleBranchesOptimistic() throws Exception {
dropTable(new String[]{"target", "src1", "src2"});