You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/06/19 11:22:36 UTC

[hive] branch master updated: HIVE-23503: ValidTxnManager doesn't consider txns opened and committed between snapshot generation and locking when evaluating ValidTxnListState (Denys Kuzmenko, reviewed by Peter Varga and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e7116e9  HIVE-23503: ValidTxnManager doesn't consider txns opened and committed between snapshot generation and locking when evaluating ValidTxnListState (Denys Kuzmenko, reviewed by Peter Varga and Peter Vary)
e7116e9 is described below

commit e7116e95ea77f26c8a7f17ef22f18ec743f2ff4d
Author: Denys Kuzmenko <dk...@apache.org>
AuthorDate: Fri Jun 19 13:22:11 2020 +0200

    HIVE-23503: ValidTxnManager doesn't consider txns opened and committed between snapshot generation and locking when evaluating ValidTxnListState (Denys Kuzmenko, reviewed by Peter Varga and Peter Vary)
---
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  9 +++-
 .../org/apache/hadoop/hive/ql/DriverContext.java   |  9 ++++
 .../org/apache/hadoop/hive/ql/ValidTxnManager.java | 29 ++++++++++--
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       | 10 ++++
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java    |  7 ++-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java     |  3 ++
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  | 54 ++++++++++++++++++++++
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |  5 ++
 .../hadoop/hive/metastore/IMetaStoreClient.java    |  2 +
 .../metastore/HiveMetaStoreClientPreCatalog.java   |  5 ++
 .../hadoop/hive/common/ValidReadTxnList.java       |  7 +++
 .../apache/hadoop/hive/common/ValidTxnList.java    |  2 +
 12 files changed, 136 insertions(+), 6 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index e70c92e..9fab7c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -682,12 +682,19 @@ public class Driver implements IDriver {
           // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
           // and then, we acquire locks. If snapshot is still valid, we continue as usual.
           // But if snapshot is not valid, we recompile the query.
+          if (driverContext.isOutdatedTxn()) {
+            driverContext.getTxnManager().rollbackTxn();
+
+            String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
+            driverContext.getTxnManager().openTxn(context, userFromUGI, driverContext.getTxnType());
+            lockAndRespond();
+          }
           driverContext.setRetrial(true);
           driverContext.getBackupContext().addSubContext(context);
           driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
           context = driverContext.getBackupContext();
           driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
-              driverContext.getTxnManager().getValidTxns().toString());
+            driverContext.getTxnManager().getValidTxns().toString());
           if (driverContext.getPlan().hasAcidResourcesInQuery()) {
             validTxnManager.recordValidWriteIds();
           }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index a8c83fc..0afa657 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -59,6 +59,7 @@ public class DriverContext {
   // either initTxnMgr or from the SessionState, in that order.
   private HiveTxnManager txnManager;
   private TxnType txnType = TxnType.DEFAULT;
+  private boolean outdatedTxn;
   private StatsSource statsSource;
 
   // Boolean to store information about whether valid txn list was generated
@@ -155,6 +156,14 @@ public class DriverContext {
     return txnType;
   }
 
+  public void setOutdatedTxn(boolean outdated) {
+    this.outdatedTxn = outdated;
+  }
+
+  public boolean isOutdatedTxn() {
+    return outdatedTxn;
+  }
+
   public void setTxnType(TxnType txnType) {
     this.txnType = txnType;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java
index 7d49c57..e5f8ce0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ValidTxnManager.java
@@ -32,9 +32,11 @@ import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -91,11 +93,22 @@ class ValidTxnManager {
       return true; // Nothing to check
     }
 
-    String currentTxnString = driverContext.getTxnManager().getValidTxns().toString();
+    GetOpenTxnsResponse openTxns = driverContext.getTxnManager().getOpenTxns();
+    ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(openTxns, 0);
+    long txnId = driverContext.getTxnManager().getCurrentTxnId();
+
+    String currentTxnString;
+    if (validTxnList.isTxnRangeValid(txnId + 1, openTxns.getTxn_high_water_mark()) != ValidTxnList.RangeResponse.NONE) {
+      // If here, there was another txn opened & committed between current SNAPSHOT generation and locking.
+      validTxnList.removeException(txnId);
+      currentTxnString = validTxnList.toString();
+    } else {
+      currentTxnString = TxnCommonUtils.createValidReadTxnList(openTxns, txnId).toString();
+    }
+
     if (currentTxnString.equals(txnString)) {
       return true; // Still valid, nothing more to do
     }
-
     return checkWriteIds(currentTxnString, nonSharedLockedTables, txnWriteIdListString);
   }
 
@@ -142,9 +155,17 @@ class ValidTxnManager {
       if (nonSharedLockedTables.contains(fullQNameForLock)) {
         // Check if table is transactional
         if (AcidUtils.isTransactionalTable(tableInfo.getValue())) {
+          ValidWriteIdList writeIdList = txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey());
+          ValidWriteIdList currentWriteIdList = currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey());
+          // Check if there was a conflicting write between current SNAPSHOT generation and locking.
+          // If yes, mark current transaction as outdated.
+          if (currentWriteIdList.isWriteIdRangeValid(writeIdList.getHighWatermark() + 1,
+              currentWriteIdList.getHighWatermark()) != ValidWriteIdList.RangeResponse.NONE) {
+            driverContext.setOutdatedTxn(true);
+            return false;
+          }
           // Check that write id is still valid
-          if (!TxnIdUtils.checkEquivalentWriteIds(txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()),
-              currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) {
+          if (!TxnIdUtils.checkEquivalentWriteIds(writeIdList, currentWriteIdList)) {
             // Write id has changed, it is not valid anymore, we need to recompile
             return false;
           }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 71afcbd..6b163d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.ql.Context;
@@ -752,6 +753,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public GetOpenTxnsResponse getOpenTxns() throws LockException {
+    try {
+      return getMS().getOpenTxns();
+    } catch (TException e) {
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
+  @Override
   public ValidTxnList getValidTxns() throws LockException {
     assert isTxnOpen();
     init();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 0383881..29266db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -20,13 +20,13 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.Context;
@@ -266,6 +266,11 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public GetOpenTxnsResponse getOpenTxns() throws LockException {
+    return new GetOpenTxnsResponse();
+  }
+
+  @Override
   public ValidTxnList getValidTxns() throws LockException {
     return new ValidReadTxnList();
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 600289f..5c75e63 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.api.TxnType;
@@ -172,6 +173,8 @@ public interface HiveTxnManager {
    */
   void heartbeat() throws LockException;
 
+  GetOpenTxnsResponse getOpenTxns() throws LockException;
+
   /**
    * Get the transactions that are currently valid.  The resulting
    * {@link ValidTxnList} object can be passed as string to the processing
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 8a15b7c..e7e6215 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2227,6 +2227,60 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
   }
 
   @Test
+  public void testInsertMergeInsertConcurrentSnapshotInvalidateNoDuplicates() throws Exception {
+    testConcurrentMergeInsertSnapshotInvalidate("insert into target values (5, 6)", false);
+  }
+  @Test
+  public void testInsertMergeInsertConcurrentSharedWriteSnapshotInvalidateNoDuplicates() throws Exception {
+    testConcurrentMergeInsertSnapshotInvalidate("insert into target values (5, 6)", true);
+  }
+  @Test
+  public void test2MergeInsertsConcurrentSnapshotInvalidateNoDuplicates() throws Exception {
+    testConcurrentMergeInsertSnapshotInvalidate("merge into target t using source s on t.a = s.a " +
+      "when not matched then insert values (s.a, s.b)", false);
+  }
+  @Test
+  public void test2MergeInsertsConcurrentSharedWriteSnapshotInvalidateNoDuplicates() throws Exception {
+    testConcurrentMergeInsertSnapshotInvalidate("merge into target t using source s on t.a = s.a " +
+      "when not matched then insert values (s.a, s.b)", true);
+  }
+  @Test
+  public void testMergeInsertNoSnapshotInvalidateNoDuplicates() throws Exception {
+    testConcurrentMergeInsertSnapshotInvalidate("insert into source values (3, 4)", false);
+  }
+
+  private void testConcurrentMergeInsertSnapshotInvalidate(String query, boolean sharedWrite) throws Exception {
+    dropTable(new String[]{"target", "source"});
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
+
+    driver.run("create table target (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into target values (1,2), (3,4)");
+    driver.run("create table source (a int, b int)");
+    driver.run("insert into source values (5,6), (7,8)");
+
+    driver.compileAndRespond("merge into target t using source s on t.a = s.a " +
+      "when not matched then insert values (s.a, s.b)");
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.run(query);
+    driver2.run("select * from target");
+
+    swapTxnManager(txnMgr);
+    try {
+      driver.run();
+    } catch (Exception ex ){
+      Assert.assertTrue(ex.getCause().getMessage().contains("due to a write conflict"));
+    }
+
+    swapTxnManager(txnMgr2);
+    driver2.run("select * from target");
+    List res = new ArrayList();
+    driver2.getFetchTask().fetch(res);
+    Assert.assertEquals("Duplicate records found", 4, res.size());
+  }
+
+  @Test
   public void test2MergeInsertsConcurrentNoDuplicates() throws Exception {
     testConcurrentMergeInsertNoDuplicates("merge into target t using source s on t.a = s.a " +
         "when not matched then insert values (s.a, s.b)", false);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 54850ae..a658cfc 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -3218,6 +3218,11 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   }
 
   @Override
+  public GetOpenTxnsResponse getOpenTxns() throws TException {
+    return client.get_open_txns();
+  }
+
+  @Override
   public ValidTxnList getValidTxns() throws TException {
     return TxnCommonUtils.createValidReadTxnList(client.get_open_txns(), 0);
   }
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index a8b1023..08f04bf 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -2946,6 +2946,8 @@ public interface IMetaStoreClient {
    */
   GetAllFunctionsResponse getAllFunctions() throws MetaException, TException;
 
+  GetOpenTxnsResponse getOpenTxns() throws TException ;
+
   /**
    * Get a structure that details valid transactions.
    * @return list of valid transactions
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index 218ea44..85ded54 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2304,6 +2304,11 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
   }
 
   @Override
+  public GetOpenTxnsResponse getOpenTxns() throws TException {
+    return client.get_open_txns();
+  }
+
+  @Override
   public ValidTxnList getValidTxns() throws TException {
     return TxnCommonUtils.createValidReadTxnList(client.get_open_txns(), 0);
   }
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
index 9cfe60e..43a2fe5 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.common;
 
 import org.apache.hive.common.util.SuppressFBWarnings;
 
+import org.apache.commons.lang.ArrayUtils;
+
 import java.util.Arrays;
 import java.util.BitSet;
 
@@ -58,6 +60,11 @@ public class ValidReadTxnList implements ValidTxnList {
   }
 
   @Override
+  public void removeException(long txnId) {
+    exceptions = ArrayUtils.remove(exceptions, Arrays.binarySearch(exceptions, txnId));
+  }
+
+  @Override
   public boolean isTxnValid(long txnid) {
     if (txnid > highWatermark) {
       return false;
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java b/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
index d4c3b09..65ce8b4 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
@@ -31,6 +31,8 @@ public interface ValidTxnList {
    */
   public static final String VALID_TXNS_KEY = "hive.txn.valid.txns";
 
+  void removeException(long txnId);
+
   /**
    * The response to a range query.  NONE means no values in this range match,
    * SOME mean that some do, and ALL means that every value does.