You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2023/01/04 17:24:44 UTC

[hive] branch master updated: HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ca8183fafc0 HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)
ca8183fafc0 is described below

commit ca8183fafc0116d5a79ce81eb0b346d5f1f08d58
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Wed Jan 4 09:24:31 2023 -0800

    HIVE-26875: Preserve query context on transaction conflict (John Sherman, reviewed by Denys Kuzmenko)
    
    Closes #3887
---
 .../java/org/apache/hadoop/hive/ql/Compiler.java   |   1 -
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  13 ++-
 .../org/apache/hadoop/hive/ql/DriverContext.java   |   9 --
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 109 +++++++++++++++++++++
 4 files changed, 117 insertions(+), 15 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
index 0a0f7df21ab..b6c955485fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -191,7 +191,6 @@ public class Compiler {
     // because at that point we need access to the objects.
     Hive.get().getMSC().flushCache();
 
-    driverContext.setBackupContext(new Context(context));
     boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
 
     HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d494f409ad7..d52c661872c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -279,10 +279,6 @@ public class Driver implements IDriver {
             driverContext.getTxnManager().clearCaches();
           }
           driverContext.setRetrial(true);
-          driverContext.getBackupContext().addSubContext(context);
-          driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
-          context = driverContext.getBackupContext();
-
           driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
               driverContext.getTxnManager().getValidTxns().toString());
 
@@ -540,7 +536,14 @@ public class Driver implements IDriver {
     String originalCboInfo = context != null ? context.cboInfo : null;
     if (context != null && context.getExplainAnalyze() != AnalyzeState.RUNNING) {
       // close the existing ctx etc before compiling a new query, but does not destroy driver
-      closeInProcess(false);
+      if (!driverContext.isRetrial()) {
+        closeInProcess(false);
+      } else {
+        // On retrial we need to maintain information from the prior context. Such
+        // as the currently held locks.
+        context = new Context(context);
+        releaseResources();
+      }
     }
 
     if (context == null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 60f08632d87..f9b1e627a09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -72,7 +72,6 @@ public class DriverContext {
   private long compactorTxnId = 0;
   private long analyzeTableWriteId = 0;
 
-  private Context backupContext = null;
   private boolean retrial = false;
 
   private DataInput resStream;
@@ -225,14 +224,6 @@ public class DriverContext {
     this.analyzeTableWriteId = analyzeTableWriteId;
   }
 
-  public Context getBackupContext() {
-    return backupContext;
-  }
-
-  public void setBackupContext(Context backupContext) {
-    this.backupContext = backupContext;
-  }
-
   public boolean isRetrial() {
     return retrial;
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index ff84c44de2d..82eef64125b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -61,6 +61,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
 
@@ -2465,6 +2466,114 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
     Assert.assertTrue("Lost Update", isEqualCollection(res, asList("earl\t10", "amy\t10")));
   }
 
+  // The intent of this test is to cause multiple conflicts to the same query to test the conflict retry functionality.
+  @Test
+  public void testConcurrentConflictRetry() throws Exception {
+    dropTable(new String[]{"target"});
+
+    driver2 = Mockito.spy(driver2);
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+    driver.run("create table target(i int) stored as orc tblproperties ('transactional'='true')");
+    driver.run("insert into target values (1),(1)");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+
+    // This partial mock allows us to execute a transaction that conflicts with the driver2 query in a controlled
+    // manner.
+    AtomicInteger lockAndRespondCount = new AtomicInteger();
+    Mockito.doAnswer((invocation) -> {
+      lockAndRespondCount.getAndIncrement();
+      // we want to make sure this transaction gets conflicted at least twice, to exercise the conflict retry loop
+      if (lockAndRespondCount.get() <= 2) {
+        swapTxnManager(txnMgr);
+        try {
+          // this should call a conflict with the current query being ran by driver2
+          driver.run("update target set i = 1 where i = 1");
+        } catch (Exception e) {
+          // do nothing
+        }
+        swapTxnManager(txnMgr2);
+      }
+      invocation.callRealMethod();
+      return null;
+    }).when(driver2).lockAndRespond();
+
+    driver2.run("update target set i = 1 where i = 1");
+
+    // we expected lockAndRespond to be called 3 times.
+    // 1 time after compilation, 2 more times due to the 2 conflicts
+    Assert.assertEquals(3, lockAndRespondCount.get());
+    swapTxnManager(txnMgr);
+    // we expect two rows
+    driver.run("select * from target");
+    List<String> res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(2, res.size());
+  }
+
+  @Test
+  public void testConcurrentConflictMaxRetryCount() throws Exception {
+    dropTable(new String[]{"target"});
+    driver2 = Mockito.spy(driver2);
+    driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, true);
+
+    final int maxRetries = 4;
+    driver2.getConf().setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT, maxRetries);
+
+    driver.run("create table target(i int) stored as orc tblproperties ('transactional'='true')");
+    driver.run("insert into target values (1),(1)");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+
+    // This run should conflict with the above query and cause the "conflict lambda" to be execute,
+    // which will then also conflict with the driver2 query and cause it to retry. The intent here is
+    // to cause driver2's query to exceed HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT and throw exception.
+    AtomicInteger lockAndRespondCount = new AtomicInteger();
+    Mockito.doAnswer((invocation) -> {
+      lockAndRespondCount.getAndIncrement();
+      // we want to make sure the transaction gets conflicted until it fails.
+      // +1 is for the initial lockAndRespond after compilation
+      if (lockAndRespondCount.get() <= 1 + maxRetries) {
+        swapTxnManager(txnMgr);
+        try {
+          // this should call a conflict with the current query being ran by driver2
+          driver.run("update target set i = 1 where i = 1");
+        } catch (Exception e) {
+          // do nothing
+        }
+        swapTxnManager(txnMgr2);
+      }
+      invocation.callRealMethod();
+      return null;
+    }).when(driver2).lockAndRespond();
+
+    boolean exceptionThrown = false;
+    // Start a query on driver2, we expect this query to never execute because the nature of the test it to conflict
+    // until HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT is exceeded.
+    // We verify that it is never executed by counting the number of rows returned that have i = 1.
+    try {
+      driver2.run("update target set i = 2 where i = 1");
+    } catch (CommandProcessorException cpe) {
+      exceptionThrown = true;
+      Assert.assertTrue(
+          cpe.getMessage().contains("Operation could not be executed, snapshot was outdated when locks were acquired.")
+      );
+    }
+    Assert.assertTrue(exceptionThrown);
+    // +1 for the inital lockAndRespond after compilation, another +1 for the lockAndRespond that caused us
+    // to exceed max retries.
+    Assert.assertEquals(maxRetries+2, lockAndRespondCount.get());
+    swapTxnManager(txnMgr);
+
+    // we expect two rows
+    driver.run("select * from target where i = 1");
+    List<String> res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(2, res.size());
+  }
+
   @Test
   public void testMergeMultipleBranchesOptimistic() throws Exception {
     dropTable(new String[]{"target", "src1", "src2"});