You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/03/23 15:07:17 UTC
[hive] branch master updated: HIVE-23042: Merge queries to a single
one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys
Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d7b6b93 HIVE-23042: Merge queries to a single one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys Kuzmenko)
d7b6b93 is described below
commit d7b6b934bd811ae3e3c6222238844a8dc467b5df
Author: Peter Vary <pv...@cloudera.com>
AuthorDate: Mon Mar 23 16:06:40 2020 +0100
HIVE-23042: Merge queries to a single one for updating MIN_OPEN_TXNS table (Peter Vary reviewed by Denys Kuzmenko)
---
.../hadoop/hive/metastore/txn/TxnHandler.java | 73 +++++++++++++---------
.../hadoop/hive/metastore/tools/BenchmarkTool.java | 10 ++-
.../hadoop/hive/metastore/tools/HMSBenchmarks.java | 10 +++
.../hadoop/hive/metastore/tools/HMSClient.java | 41 ++++++++++++
4 files changed, 102 insertions(+), 32 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 42cd89f..347cb6e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -625,31 +625,46 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
// Need to register minimum open txnid for current transactions into MIN_HISTORY table.
- s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new IllegalStateException("Scalar query returned no rows?!?!!");
- }
+ // For a single txn we can do it in a single insert. With multiple txns calculating the
+ // minOpenTxnId for every insert is not cost effective, so caching the value
+ if (txnIds.size() == 1) {
+ s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " +
+ "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+ LOG.debug("Going to execute query <" + s + ">");
+ try (PreparedStatement pstmt = dbConn.prepareStatement(s)) {
+ pstmt.setLong(1, txnIds.get(0));
+ pstmt.execute();
+ }
+ LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds);
+ } else {
+ s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+ LOG.debug("Going to execute query <" + s + ">");
+ long minOpenTxnId = -1L;
+ try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) {
+ if (!minOpenTxnIdRs.next()) {
+ throw new IllegalStateException("Scalar query returned no rows?!?!!");
+ }
+ // TXNS table should have at least one entry because we just inserted the newly opened txns.
+ // So, min(txn_id) would be a non-zero txnid.
+ minOpenTxnId = minOpenTxnIdRs.getLong(1);
+ }
- // TXNS table should have atleast one entry because we just inserted the newly opened txns.
- // So, min(txn_id) would be a non-zero txnid.
- long minOpenTxnId = rs.getLong(1);
- assert (minOpenTxnId > 0);
- rows.clear();
- for (long txnId = first; txnId < first + numTxns; txnId++) {
- rows.add(txnId + ", " + minOpenTxnId);
- }
+ assert (minOpenTxnId > 0);
+ rows.clear();
+ for (long txnId = first; txnId < first + numTxns; txnId++) {
+ rows.add(txnId + ", " + minOpenTxnId);
+ }
- // Insert transaction entries into MIN_HISTORY_LEVEL.
- List<String> inserts = sqlGenerator.createInsertValuesStmt(
- "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
- for (String insert : inserts) {
- LOG.debug("Going to execute insert <" + insert + ">");
- stmt.execute(insert);
+ // Insert transaction entries into MIN_HISTORY_LEVEL.
+ List<String> inserts = sqlGenerator.createInsertValuesStmt(
+ "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows);
+ for (String insert : inserts) {
+ LOG.debug("Going to execute insert <" + insert + ">");
+ stmt.execute(insert);
+ }
+ LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
+ + ") with min_open_txn: " + minOpenTxnId);
}
- LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
- + ") with min_open_txn: " + minOpenTxnId);
if (rqst.isSetReplPolicy()) {
List<String> rowsRepl = new ArrayList<>();
@@ -1054,25 +1069,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
/**
* Concurrency/isolation notes:
* This is mutexed with {@link #openTxns(OpenTxnRequest)} and other {@link #commitTxn(CommitTxnRequest)}
- * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNX table for specific txnid:X
+ * operations using select4update on NEXT_TXN_ID. Also, mutexes on TXNS table for specific txnid:X
* see more notes below.
* In order to prevent lost updates, we need to determine if any 2 transactions overlap. Each txn
* is viewed as an interval [M,N]. M is the txnid and N is taken from the same NEXT_TXN_ID sequence
* so that we can compare commit time of txn T with start time of txn S. This sequence can be thought of
- * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
+ * as a logical time counter. If S.commitTime < T.startTime, T and S do NOT overlap.
*
* Motivating example:
* Suppose we have multi-statment transactions T and S both of which are attempting x = x + 1
- * In order to prevent lost update problem, the the non-overlapping txns must lock in the snapshot
- * that they read appropriately. In particular, if txns do not overlap, then one follows the other
- * (assumig they write the same entity), and thus the 2nd must see changes of the 1st. We ensure
+ * In order to prevent lost update problem, then the non-overlapping txns must lock in the snapshot
+ * that they read appropriately. In particular, if txns do not overlap, then one follows the other
+ * (assuming they write the same entity), and thus the 2nd must see changes of the 1st. We ensure
* this by locking in snapshot after
* {@link #openTxns(OpenTxnRequest)} call is made (see org.apache.hadoop.hive.ql.Driver.acquireLocksAndOpenTxn)
- * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
+ * and mutexing openTxn() with commit(). In other words, once a S.commit() starts we must ensure
* that txn T which will be considered a later txn, locks in a snapshot that includes the result
* of S's commit (assuming no other txns).
* As a counter example, suppose we have S[3,3] and T[4,4] (commitId=txnid means no other transactions
- * were running in parallel). If T and S both locked in the same snapshot (for example commit of
+ * were running in parallel). If T and S both locked in the same snapshot (for example commit of
* txnid:2, which is possible if commitTxn() and openTxnx() is not mutexed)
* 'x' would be updated to the same value by both, i.e. lost update.
*/
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
index 041cd76..2ab9388 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/BenchmarkTool.java
@@ -69,7 +69,7 @@ public class BenchmarkTool implements Runnable {
private String tableName = TEST_TABLE + "_" + System.getProperty("user.name");
- @Option(names = {"-N", "--number"}, description = "umber of object instances")
+ @Option(names = {"-N", "--number"}, description = "number of object instances")
private int[] instances = {100};
@Option(names = {"-L", "--spin"}, description = "spin count")
@@ -176,7 +176,9 @@ public class BenchmarkTool implements Runnable {
.add("renameTable",
() -> benchmarkRenameTable(bench, bData, 1))
.add("dropDatabase",
- () -> benchmarkDropDatabase(bench, bData, 1));
+ () -> benchmarkDropDatabase(bench, bData, 1))
+ .add("openTxn",
+ () -> benchmarkOpenTxns(bench, bData, 1));
for (int howMany: instances) {
suite.add("listTables" + '.' + howMany,
@@ -198,7 +200,9 @@ public class BenchmarkTool implements Runnable {
.add("renameTable" + '.' + howMany,
() -> benchmarkRenameTable(bench, bData, howMany))
.add("dropDatabase" + '.' + howMany,
- () -> benchmarkDropDatabase(bench, bData, howMany));
+ () -> benchmarkDropDatabase(bench, bData, howMany))
+ .add("openTxns" + '.' + howMany,
+ () -> benchmarkOpenTxns(bench, bData, howMany));
}
if (doList) {
diff --git a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
index f53f2ef..d80c290 100644
--- a/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
+++ b/standalone-metastore/metastore-tools/metastore-benchmarks/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSBenchmarks.java
@@ -408,6 +408,15 @@ final class HMSBenchmarks {
}
}
+ static DescriptiveStatistics benchmarkOpenTxns(@NotNull MicroBenchmark bench,
+ @NotNull BenchData data,
+ int howMany) {
+ final HMSClient client = data.getClient();
+ return bench.measure(null,
+ () -> throwingSupplierWrapper(() -> client.openTxn(howMany)),
+ () -> throwingSupplierWrapper(() -> client.abortTxns(client.getOpenTxns())));
+ }
+
private static void createManyTables(HMSClient client, int howMany, String dbName, String format) {
List<FieldSchema> columns = createSchema(new ArrayList<>(Arrays.asList("name", "string")));
List<FieldSchema> partitions = createSchema(new ArrayList<>(Arrays.asList("date", "string")));
@@ -444,4 +453,5 @@ final class HMSBenchmarks {
throwingSupplierWrapper(client::getCurrentNotificationId));
}
+
}
diff --git a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
index 7cc1e42..4e75ede 100644
--- a/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
+++ b/standalone-metastore/metastore-tools/tools-common/src/main/java/org/apache/hadoop/hive/metastore/tools/HMSClient.java
@@ -20,10 +20,15 @@ package org.apache.hadoop.hive.metastore.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.RequestPartsSpec;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -52,7 +57,9 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -319,6 +326,40 @@ final class HMSClient implements AutoCloseable {
client.append_partition_with_environment_context(dbName, tableName, partitionValues, null);
}
+ boolean openTxn(int numTxns) throws TException {
+ client.open_txns(new OpenTxnRequest(numTxns, "Test", "Host"));
+ return true;
+ }
+
+ List<Long> getOpenTxns() throws TException {
+ GetOpenTxnsResponse txns = client.get_open_txns();
+ List<Long> openTxns = new ArrayList<>();
+ BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
+ int i = 0;
+ for(long txnId : txns.getOpen_txns()) {
+ if(!abortedBits.get(i)) {
+ openTxns.add(txnId);
+ }
+ ++i;
+ }
+ return openTxns;
+ }
+
+ boolean commitTxn(long txnId) throws TException {
+ client.commit_txn(new CommitTxnRequest(txnId));
+ return true;
+ }
+
+ boolean abortTxn(long txnId) throws TException {
+ client.abort_txn(new AbortTxnRequest(txnId));
+ return true;
+ }
+
+ boolean abortTxns(List<Long> txnIds) throws TException {
+ client.abort_txns(new AbortTxnsRequest(txnIds));
+ return true;
+ }
+
private TTransport open(Configuration conf, @NotNull URI uri) throws
TException, IOException, LoginException {
boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL);