You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/23 15:07:17 UTC

[hive] branch master updated: HIVE-23042: Merge queries to a single one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d7b6b93  HIVE-23042: Merge queries to a single one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys Kuzmenko)
d7b6b93 is described below

commit d7b6b934bd811ae3e3c6222238844a8dc467b5df
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Mon Mar 23 16:06:40 2020 +0100

    HIVE-23042: Merge queries to a single one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys Kuzmenko)
---
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 73 +++++++++++++---------
 .../hadoop/hive/metastore/tools/BenchmarkTool.java | 10 ++-
 .../hadoop/hive/metastore/tools/HMSBenchmarks.java | 10 +++
 .../hadoop/hive/metastore/tools/HMSClient.java     | 41 ++++++++++++
 4 files changed, 102 insertions(+), 32 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 42cd89f..347cb6e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,31 +625,46 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
 
       // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
-      s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new IllegalStateException("Scalar query returned no rows?!?!!");
-      }
+      // For a single txn we can do it in a single insert. With multiple txns calculating the
+      // minOpenTxnId for every insert is not cost effective, so caching the value
+      if (txnIds.size() == 1) {
+        s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " +
+                "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+        LOG.debug("Going to execute query <" + s + ">");
+        try (PreparedStatement pstmt = dbConn.prepareStatement(s)) {
+          pstmt.setLong(1, txnIds.get(0));
+          pstmt.execute();
+        }
+        LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds);
+      } else {
+        s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+        LOG.debug("Going to execute query <" + s + ">");
+        long minOpenTxnId = -1L;
+        try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) {
+          if (!minOpenTxnIdRs.next()) {
+            throw new IllegalStateException("Scalar query returned no rows?!?!!");
+          }
+          // TXNS table should have at least one entry because we just inserted the newly opened txns.
+          // So, min(txn_id) would be a non-zero txnid.
+          minOpenTxnId = minOpenTxnIdRs.getLong(1);
+        }
 
-      // TXNS table should have atleast one entry because we just inserted the newly opened txns.
-      // So, min(txn_id) would be a non-zero txnid.
-      long minOpenTxnId = rs.getLong(1);
-      assert (minOpenTxnId > 0);
-      rows.clear();
-      for (long txnId = first; txnId < first + numTxns; txnId++) {
-        rows.add(txnId + ", " + minOpenTxnId);
-      }
+        assert (minOpenTxnId > 0);
+        rows.clear();
+        for (long txnId = first; txnId < first + numTxns; txnId++) {
+          rows.add(txnId + ", " + minOpenTxnId);
+        }
 
-      // Insert transaction entries into MIN_HISTORY_LEVEL.
-      List<String> inserts = sqlGenerator.createInsertValuesStmt(
-              "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
-      for (String insert : inserts) {
-        LOG.debug("Going to execute insert <" + insert + ">");
-        stmt.execute(insert);
+        // Insert transaction entries into MIN_HISTORY_LEVEL.
+        List<String> inserts = sqlGenerator.createInsertValuesStmt(
+            "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
+        for (String insert : inserts) {
+          LOG.debug("Going to execute insert <" + insert + ">");
+          stmt.execute(insert);
+        }
+        LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
+                     + ") with min_open_txn: " + minOpenTxnId);
       }
-      LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
-              + ") with min_open_txn: " + minOpenTxnId);
 
       if (rqst.isSetReplPolicy()) {
         List<String> rowsRepl = new ArrayList<>();
@@ -1054,25 +1069,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   /**
    * Concurrency/isolation notes:
    * This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
-   * operations using select4update on NEXT_TXN_ID.  Also, mutexes on TXNX table for specific txnid:X
+   * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNS table for specific txnid:X
    * see more notes below.
    * In order to prevent lost updates, we need to determine if any 2 transactions overlap.  Each txn
    * is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
    * so that we can compare commit time of txn T with start time of txn S.  This sequence can be thought of
-   * as a logical time counter.  If S.commitTime < T.startTime, T and S do NOT overlap.
+   * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
    *
    * Motivating example:
    * Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
-   * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
-   * that they read appropriately.  In particular, if txns do not overlap, then one follows the other
-   * (assumig they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
+   * In order to prevent lost update problem, then the non-overlapping txns must lock in the snapshot
+   * that they read appropriately. In particular, if txns do not overlap, then one follows the other
+   * (assuming they write the same entity), and thus the 2nd must see changes of the 1st.  We ensure
    * this by locking in snapshot after
    * {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn)
-   * and mutexing openTxn() with commit().  In other words, once a S.commit() starts we must ensure
+   * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
    * that txn T which will be considered a later txn, locks in a snapshot that includes the result
    * of S's commit (assuming no other txns).
    * As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
-   * were running in parallel).  If T and S both locked in the same snapshot (for example commit of
+   * were running in parallel). If T and S both locked in the same snapshot (for example commit of
    * txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
    * 'x' would be updated to the same value by both, i.e. lost update.
    */
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index 041cd76..2ab9388 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -69,7 +69,7 @@ public class BenchmarkTool implements Runnable {
   private String tableName = TEST_TABLE + "_" + System.getProperty("user.name");
 
 
-  @Option(names = {"-N", "--number"}, description = "umber of object instances")
+  @Option(names = {"-N", "--number"}, description = "number of object instances")
   private int[] instances = {100};
 
   @Option(names = {"-L", "--spin"}, description = "spin count")
@@ -176,7 +176,9 @@ public class BenchmarkTool implements Runnable {
         .add("renameTable",
             () -> benchmarkRenameTable(bench, bData, 1))
         .add("dropDatabase",
-            () -> benchmarkDropDatabase(bench, bData, 1));
+            () -> benchmarkDropDatabase(bench, bData, 1))
+        .add("openTxn",
+            () -> benchmarkOpenTxns(bench, bData, 1));
 
     for (int howMany: instances) {
       suite.add("listTables" + '.' + howMany,
@@ -198,7 +200,9 @@ public class BenchmarkTool implements Runnable {
           .add("renameTable" + '.' + howMany,
               () -> benchmarkRenameTable(bench, bData, howMany))
           .add("dropDatabase" + '.' + howMany,
-              () -> benchmarkDropDatabase(bench, bData, howMany));
+              () -> benchmarkDropDatabase(bench, bData, howMany))
+          .add("openTxns" + '.' + howMany,
+              () -> benchmarkOpenTxns(bench, bData, howMany));
     }
 
     if (doList) {
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index f53f2ef..d80c290 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -408,6 +408,15 @@ final class HMSBenchmarks {
     }
   }
 
+  static DescriptiveStatistics benchmarkOpenTxns(@NotNull MicroBenchmark bench,
+                                                 @NotNull BenchData data,
+                                                 int howMany) {
+    final HMSClient client = data.getClient();
+    return bench.measure(null,
+        () -> throwingSupplierWrapper(() -> client.openTxn(howMany)),
+        () -> throwingSupplierWrapper(() -> client.abortTxns(client.getOpenTxns())));
+  }
+
   private static void createManyTables(HMSClient client, int howMany, String dbName, String format) {
     List<FieldSchema> columns = createSchema(new ArrayList<>(Arrays.asList("name", "string")));
     List<FieldSchema> partitions = createSchema(new ArrayList<>(Arrays.asList("date", "string")));
@@ -444,4 +453,5 @@ final class HMSBenchmarks {
         throwingSupplierWrapper(client::getCurrentNotificationId));
   }
 
+
 }
diff --git a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
index 7cc1e42..4e75ede 100644
--- a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
+++ b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.hive.metastore.tools;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
 import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -52,7 +57,9 @@ import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -319,6 +326,40 @@ final class HMSClient implements AutoCloseable {
     client.append_partition_with_environment_context(dbName, tableName, partitionValues, null);
   }
 
+  boolean openTxn(int numTxns) throws TException {
+    client.open_txns(new OpenTxnRequest(numTxns, "Test", "Host"));
+    return true;
+  }
+
+  List<Long> getOpenTxns() throws TException {
+    GetOpenTxnsResponse txns = client.get_open_txns();
+    List<Long> openTxns = new ArrayList<>();
+    BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
+    int i = 0;
+    for(long txnId : txns.getOpen_txns()) {
+      if(!abortedBits.get(i)) {
+        openTxns.add(txnId);
+      }
+      ++i;
+    }
+    return openTxns;
+  }
+
+  boolean commitTxn(long txnId) throws TException {
+    client.commit_txn(new CommitTxnRequest(txnId));
+    return true;
+  }
+
+  boolean abortTxn(long txnId) throws TException {
+    client.abort_txn(new AbortTxnRequest(txnId));
+    return true;
+  }
+
+  boolean abortTxns(List<Long> txnIds) throws TException {
+    client.abort_txns(new AbortTxnsRequest(txnIds));
+    return true;
+  }
+
   private TTransport open(Configuration conf, @NotNull URI uri) throws
       TException, IOException, LoginException {
     boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL);