You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/04 12:15:44 UTC
[hive] branch master updated: HIVE-21114: Create read-only
transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cdad75a HIVE-21114: Create read-only transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)
cdad75a is described below
commit cdad75aaf0e43d01c053e85817785f415884e608
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Nov 4 12:59:01 2019 +0100
HIVE-21114: Create read-only transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)
---
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 122 +++++++++----------
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 22 +++-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 12 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 7 ++
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 11 ++
.../hadoop/hive/ql/lockmgr/TestDbTxnManager.java | 12 +-
.../hadoop/hive/ql/parse/TestParseUtils.java | 96 +++++++++++++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 135 ++++++++++-----------
.../hive/metastore/TestHiveMetaStoreTxns.java | 19 ++-
9 files changed, 283 insertions(+), 153 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 91910d1..bae0ffd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -22,10 +22,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.IOException;
import java.io.PrintStream;
-import java.io.Serializable;
import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -37,11 +35,10 @@ import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -53,11 +50,11 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
@@ -82,7 +79,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lock.CompileLock;
@@ -185,6 +181,7 @@ public class Driver implements IDriver {
// Transaction manager used for the query. This will be set at compile time based on
// either initTxnMgr or from the SessionState, in that order.
private HiveTxnManager queryTxnMgr;
+ private TxnType queryTxnType = TxnType.DEFAULT;
private StatsSource statsSource;
// Boolean to store information about whether valid txn list was generated
@@ -475,7 +472,9 @@ public class Driver implements IDriver {
&& queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
setLastReplIdForDump(queryState.getConf());
}
- openTransaction();
+ queryTxnType = AcidUtils.getTxnType(tree);
+ openTransaction(queryTxnType);
+
generateValidTxnList();
}
@@ -676,7 +675,7 @@ public class Driver implements IDriver {
lckCmp.getType() == LockType.SHARED_WRITE) &&
lckCmp.getTablename() != null) {
nonSharedLocks.add(
- Warehouse.getQualifiedName(
+ TableName.getDbTable(
lckCmp.getDbname(), lckCmp.getTablename()));
}
}
@@ -687,7 +686,7 @@ public class Driver implements IDriver {
lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) &&
lock.getHiveLockObject().getPaths().length == 2) {
nonSharedLocks.add(
- Warehouse.getQualifiedName(
+ TableName.getDbTable(
lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
}
}
@@ -699,25 +698,27 @@ public class Driver implements IDriver {
return true;
}
ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListStr);
- List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
+ Map<String, Table> writtenTables = getWrittenTables(plan);
+
ValidTxnWriteIdList currentTxnWriteIds =
queryTxnMgr.getValidWriteIds(
- writtenTables.stream()
- .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
- .map(e -> e.getLeft())
+ writtenTables.entrySet().stream()
+ .filter(e -> AcidUtils.isTransactionalTable(e.getValue()))
+ .map(Map.Entry::getKey)
.collect(Collectors.toList()),
currentTxnString);
- for (Pair<String, Table> tableInfo : writtenTables) {
- String fullQNameForLock = Warehouse.getQualifiedName(
- tableInfo.getRight().getDbName(),
- MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
+
+ for (Map.Entry<String, Table> tableInfo : writtenTables.entrySet()) {
+ String fullQNameForLock = TableName.getDbTable(
+ tableInfo.getValue().getDbName(),
+ MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName()));
if (nonSharedLocks.contains(fullQNameForLock)) {
// Check if table is transactional
- if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
+ if (AcidUtils.isTransactionalTable(tableInfo.getValue())) {
// Check that write id is still valid
if (!TxnIdUtils.checkEquivalentWriteIds(
- txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
- currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
+ txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()),
+ currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) {
// Write id has changed, it is not valid anymore,
// we need to recompile
return false;
@@ -729,7 +730,7 @@ public class Driver implements IDriver {
if (!nonSharedLocks.isEmpty()) {
throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
" been visited when trying to validate the locks from query tables.\n" +
- "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
+ "Tables: " + writtenTables.keySet() + "\n" +
"Remaining locks after check: " + nonSharedLocks);
}
// It passes the test, it is valid
@@ -766,10 +767,10 @@ public class Driver implements IDriver {
LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
}
- private void openTransaction() throws LockException, CommandProcessorException {
+ private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException {
if (checkConcurrency() && startImplicitTxn(queryTxnMgr) && !queryTxnMgr.isTxnOpen()) {
String userFromUGI = getUserFromUGI();
- queryTxnMgr.openTxn(ctx, userFromUGI);
+ queryTxnMgr.openTxn(ctx, userFromUGI, txnType);
}
}
@@ -933,7 +934,7 @@ public class Driver implements IDriver {
throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
}
- List<String> txnTables = getTransactionalTableList(plan);
+ List<String> txnTables = getTransactionalTables(plan);
ValidTxnWriteIdList txnWriteIds = null;
if (compactionWriteIds != null) {
/**
@@ -946,12 +947,17 @@ public class Driver implements IDriver {
if (txnTables.size() != 1) {
throw new LockException("Unexpected tables in compaction: " + txnTables);
}
- String fullTableName = txnTables.get(0);
txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
} else {
txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
}
+ if (queryTxnType == TxnType.READ_ONLY && !getWrittenTables(plan).isEmpty()) {
+ throw new IllegalStateException(String.format(
+ "Inferred transaction type '%s' doesn't conform to the actual query string '%s'",
+ queryTxnType, queryState.getQueryString()));
+ }
+
String writeIdStr = txnWriteIds.toString();
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
if (plan.getFetchTask() != null) {
@@ -980,20 +986,31 @@ public class Driver implements IDriver {
return txnWriteIds;
}
- // Make the list of transactional tables list which are getting read or written by current txn
- private List<String> getTransactionalTableList(QueryPlan plan) {
- Set<String> tableList = new HashSet<>();
+ // Make the list of transactional tables that are read or written by current txn
+ private List<String> getTransactionalTables(QueryPlan plan) {
+ Map<String, Table> tables = new HashMap<>();
+ plan.getInputs().forEach(
+ input -> addTableFromEntity(input, tables)
+ );
+ plan.getOutputs().forEach(
+ output -> addTableFromEntity(output, tables)
+ );
+ return tables.entrySet().stream()
+ .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
- for (ReadEntity input : plan.getInputs()) {
- addTableFromEntity(input, tableList);
- }
- for (WriteEntity output : plan.getOutputs()) {
- addTableFromEntity(output, tableList);
- }
- return new ArrayList<String>(tableList);
+ // Make the map of tables written by current txn
+ private Map<String, Table> getWrittenTables(QueryPlan plan) {
+ Map<String, Table> tables = new HashMap<>();
+ plan.getOutputs().forEach(
+ output -> addTableFromEntity(output, tables)
+ );
+ return tables;
}
- private void addTableFromEntity(Entity entity, Collection<String> tableList) {
+ private void addTableFromEntity(Entity entity, Map<String, Table> tables) {
Table tbl;
switch (entity.getType()) {
case TABLE: {
@@ -1009,39 +1026,8 @@ public class Driver implements IDriver {
return;
}
}
- if (!AcidUtils.isTransactionalTable(tbl)) {
- return;
- }
String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
- tableList.add(fullTableName);
- }
-
- // Make the list of transactional tables list which are getting written by current txn
- private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
- List<Pair<String, Table>> result = new ArrayList<>();
- Set<String> tableList = new HashSet<>();
- for (WriteEntity output : plan.getOutputs()) {
- Table tbl;
- switch (output.getType()) {
- case TABLE: {
- tbl = output.getTable();
- break;
- }
- case PARTITION:
- case DUMMYPARTITION: {
- tbl = output.getPartition().getTable();
- break;
- }
- default: {
- continue;
- }
- }
- String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
- if (tableList.add(fullTableName)) {
- result.add(new ImmutablePair(fullTableName, tbl));
- }
- }
- return result;
+ tables.put(fullTableName, tbl);
}
private String getUserFromUGI() throws CommandProcessorException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index fcf499d..67996c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
-import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX;
+import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.table.creation.CreateTableDesc;
@@ -72,6 +73,8 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -90,6 +93,7 @@ import com.google.common.annotations.VisibleForTesting;
import javax.annotation.concurrent.Immutable;
import java.nio.charset.Charset;
+import java.util.stream.Stream;
/**
* Utilities that are shared by all of the ACID input and output formats. They
@@ -2983,4 +2987,20 @@ public class AcidUtils {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex);
}
}
+
+ /**
+ * Determines transaction type based on query AST.
+ * @param tree AST
+ */
+ public static TxnType getTxnType(ASTNode tree) {
+ final ASTSearcher astSearcher = new ASTSearcher();
+
+ return (tree.getToken().getType() == HiveParser.TOK_QUERY &&
+ Stream.of(
+ new int[]{HiveParser.TOK_INSERT_INTO},
+ new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB})
+ .noneMatch(pattern ->
+ astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ?
+ TxnType.READ_ONLY : TxnType.DEFAULT;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 943aa38..76934bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -220,11 +221,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
@Override
public long openTxn(Context ctx, String user) throws LockException {
- return openTxn(ctx, user, 0);
+ return openTxn(ctx, user, TxnType.DEFAULT, 0);
+ }
+
+ @Override
+ public long openTxn(Context ctx, String user, TxnType txnType) throws LockException {
+ return openTxn(ctx, user, txnType, 0);
}
@VisibleForTesting
- long openTxn(Context ctx, String user, long delay) throws LockException {
+ long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException {
/*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
whenever it chooses
A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
@@ -236,7 +242,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
}
try {
- txnId = getMS().openTxn(user);
+ txnId = getMS().openTxn(user, txnType);
stmtId = 0;
numStatements = 0;
tableWriteIds.clear();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index ac813c8..7820013 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -54,6 +55,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
private HiveLockManagerCtx lockManagerCtx;
@Override
+ public long openTxn(Context ctx, String user, TxnType txnType) throws LockException {
+ // No-op
+ return 0L;
+ }
+
+ @Override
public long openTxn(Context ctx, String user) throws LockException {
// No-op
return 0L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 1c53426..600289f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverState;
import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc;
@@ -50,6 +51,16 @@ public interface HiveTxnManager {
*/
long openTxn(Context ctx, String user) throws LockException;
+ /**
+ * Open a new transaction.
+ * @param ctx Context for this query
+ * @param user Hive user who is opening this transaction.
+ * @param txnType transaction type.
+ * @return The new transaction id
+ * @throws LockException if a transaction is already open.
+ */
+ long openTxn(Context ctx, String user, TxnType txnType) throws LockException;
+
/**
* Open a new transaction in target cluster.
* @param replPolicy Replication policy to uniquely identify the source cluster.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index cc86afe..cd91948 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -210,7 +211,8 @@ public class TestDbTxnManager {
public void testExceptions() throws Exception {
addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
- ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+ ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", TxnType.DEFAULT,
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
LockException exception = null;
@@ -224,7 +226,8 @@ public class TestDbTxnManager {
Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
exception = null;
- ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+ ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", TxnType.DEFAULT,
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();//this will abort the txn
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
@@ -441,7 +444,7 @@ public class TestDbTxnManager {
// Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
// then txt should be able to commit
// Start the heartbeat after a delay, which is shorter than the HIVE_TXN_TIMEOUT
- ((DbTxnManager) txnMgr).openTxn(ctx, "tom",
+ ((DbTxnManager) txnMgr).openTxn(ctx, "tom", TxnType.DEFAULT,
HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
txnMgr.acquireLocks(qp, ctx, "tom");
runReaper();
@@ -457,7 +460,8 @@ public class TestDbTxnManager {
// then the txn will time out and be aborted.
// Here we just don't send the heartbeat at all - an infinite delay.
// Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
- ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+ ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", TxnType.DEFAULT,
+ HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
txnMgr.acquireLocks(qp, ctx, "jerry");
Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
runReaper();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
new file mode 100644
index 0000000..bf8a028
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Transaction type derived from the original query test.
+ */
+@RunWith(value = Parameterized.class)
+public class TestParseUtils {
+
+ private String query;
+ private TxnType txnType;
+
+ public TestParseUtils(String query, TxnType txnType) {
+ this.query = query;
+ this.txnType = txnType;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][]{
+ {"SELECT current_timestamp()", TxnType.READ_ONLY},
+ {"SELECT count(*) FROM a", TxnType.READ_ONLY},
+ {"SELECT count(*) FROM a JOIN b ON a.id = b.id", TxnType.READ_ONLY},
+
+ {"WITH a AS (SELECT current_timestamp()) " +
+ " SELECT * FROM a", TxnType.READ_ONLY},
+
+ {"INSERT INTO a VALUES (1, 2)", TxnType.DEFAULT},
+ {"INSERT INTO a SELECT * FROM b", TxnType.DEFAULT},
+ {"INSERT OVERWRITE TABLE a SELECT * FROM b", TxnType.DEFAULT},
+
+ {"FROM b INSERT OVERWRITE TABLE a SELECT *", TxnType.DEFAULT},
+
+ {"WITH a AS (SELECT current_timestamp()) " +
+ " INSERT INTO b SELECT * FROM a", TxnType.DEFAULT},
+
+ {"UPDATE a SET col_b = 1", TxnType.DEFAULT},
+ {"DELETE FROM a WHERE col_b = 1", TxnType.DEFAULT},
+
+ {"CREATE TABLE a (col_b int)", TxnType.DEFAULT},
+ {"CREATE TABLE a AS SELECT * FROM b", TxnType.DEFAULT},
+ {"DROP TABLE a", TxnType.DEFAULT},
+
+ {"LOAD DATA LOCAL INPATH './examples/files/kv.txt' " +
+ " OVERWRITE INTO TABLE a", TxnType.DEFAULT},
+
+ {"REPL LOAD a from './examples/files/kv.txt'", TxnType.DEFAULT},
+ {"REPL DUMP a", TxnType.DEFAULT},
+ {"REPL STATUS a", TxnType.DEFAULT},
+
+ {"MERGE INTO a trg using b src " +
+ " ON src.col_a = trg.col_a " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET col_b = src.col_b " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT VALUES (src.col_a, src.col_b)",
+ TxnType.DEFAULT},
+ });
+ }
+
+ @Test
+ public void testTxnType() throws ParseException {
+ Assert.assertEquals(
+ AcidUtils.getTxnType(ParseUtils.parse(query)), txnType);
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 355c4f5..6281208 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -57,6 +57,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -1700,10 +1701,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow
}
- // Check if all the input txns are in open state. Write ID should be allocated only for open transactions.
- if (!isTxnsInOpenState(txnIds, stmt)) {
- ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
- throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+ // Check if all the input txns are in valid state.
+ // Write IDs should be allocated only for open and not read-only transactions.
+ if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
+ String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
+ + " failed for input txns: "
+ + getAbortedAndReadOnlyTxns(txnIds, stmt)
+ + getCommittedTxns(txnIds, stmt);
+ LOG.error(errorMsg);
+
+ throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
+ + " as not all input txns in open state or read-only");
}
List<String> queries = new ArrayList<>();
@@ -1716,7 +1724,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// first write on a table will allocate write id and rest of the writes should re-use it.
prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
+ " t2w_database = ? and t2w_table = ?" + " and ");
- suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
txnIds, "t2w_txnid", false, false);
@@ -4669,112 +4676,94 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
/**
- * Checks if all the txns in the list are in open state.
- * @param txnIds list of txns to be evaluated for open state
+ * Checks if all the txns in the list are in open state and not read-only.
+ * @param txnIds list of txns to be evaluated for open state/read-only status
* @param stmt db statement
- * @return If all txns in open state, then return true else false
+ * @return If all the txns in open state and not read-only, then return true else false
*/
- private boolean isTxnsInOpenState(List<Long> txnIds, Statement stmt) throws SQLException {
+ private boolean isTxnsOpenAndNotReadOnly(List<Long> txnIds, Statement stmt) throws SQLException {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
- // Get the count of txns from the given list are in open state. If the returned count is same as
- // the input number of txns, then it means, all are in open state.
- prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and ");
- suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- txnIds, "txn_id", false, false);
+ // Get the count of txns from the given list that are in open state and not read-only.
+ // If the returned count is same as the input number of txns, then all txns are in open state and not read-only.
+ prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN
+ + "' and txn_type != " + TxnType.READ_ONLY.getValue() + " and ");
+
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+ txnIds, "txn_id", false, false);
long count = 0;
for (String query : queries) {
LOG.debug("Going to execute query <" + query + ">");
- ResultSet rs = stmt.executeQuery(query);
- if (rs.next()) {
- count += rs.getLong(1);
+ try (ResultSet rs = stmt.executeQuery(query)) {
+ if (rs.next()) {
+ count += rs.getLong(1);
+ }
}
}
return count == txnIds.size();
}
/**
- * Checks if all the txns in the list are in open state.
- * @param dbName Database name
- * @param tblName Table on which we try to allocate write id
- * @param txnIds list of txns to be evaluated for open state
+ * Get txns from the list that are either aborted or read-only.
+ * @param txnIds list of txns to be evaluated for aborted state/read-only status
* @param stmt db statement
*/
- private void ensureAllTxnsValid(String dbName, String tblName, List<Long> txnIds, Statement stmt)
- throws SQLException {
+ private String getAbortedAndReadOnlyTxns(List<Long> txnIds, Statement stmt) throws SQLException {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
-
- // Check if any of the txns in the list is aborted.
- prefix.append("select txn_id, txn_state from TXNS where ");
- suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- txnIds, "txn_id", false, false);
- Long txnId;
- char txnState;
- boolean isAborted = false;
- StringBuilder errorMsg = new StringBuilder();
- errorMsg.append("Write ID allocation on ")
- .append(Warehouse.getQualifiedName(dbName, tblName))
- .append(" failed for input txns: ");
+
+ // Check if any of the txns in the list are either aborted or read-only.
+ prefix.append("select txn_id, txn_state, txn_type from TXNS where ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+ txnIds, "txn_id", false, false);
+ StringBuilder txnInfo = new StringBuilder();
+
for (String query : queries) {
LOG.debug("Going to execute query <" + query + ">");
- ResultSet rs = stmt.executeQuery(query);
- while (rs.next()) {
- txnId = rs.getLong(1);
- txnState = rs.getString(2).charAt(0);
- if (txnState != TXN_OPEN) {
- isAborted = true;
- errorMsg.append("{").append(txnId).append(",").append(txnState).append("}");
+ try (ResultSet rs = stmt.executeQuery(query)) {
+ while (rs.next()) {
+ long txnId = rs.getLong(1);
+ char txnState = rs.getString(2).charAt(0);
+ TxnType txnType = TxnType.findByValue(rs.getInt(3));
+
+ if (txnState != TXN_OPEN) {
+ txnInfo.append("{").append(txnId).append(",").append(txnState).append("}");
+ } else if (txnType == TxnType.READ_ONLY) {
+ txnInfo.append("{").append(txnId).append(",read-only}");
+ }
}
}
}
- // Check if any of the txns in the list is committed.
- boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg);
- if (isAborted || isCommitted) {
- LOG.error(errorMsg.toString());
- throw new IllegalStateException("Write ID allocation failed on "
- + Warehouse.getQualifiedName(dbName, tblName)
- + " as not all input txns in open state");
- }
+ return txnInfo.toString();
}
/**
- * Checks if all the txns in the list are in committed. If yes, throw eception.
- * @param txnIds list of txns to be evaluated for committed
+ * Get txns from the list that are committed.
+ * @param txnIds list of txns to be evaluated for committed state
* @param stmt db statement
- * @return true if any input txn is committed, else false
*/
- private boolean checkIfTxnsCommitted(List<Long> txnIds, Statement stmt, StringBuilder errorMsg)
- throws SQLException {
+ private String getCommittedTxns(List<Long> txnIds, Statement stmt) throws SQLException {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
- // Check if any of the txns in the list is committed. If yes, throw exception.
+ // Check if any of the txns in the list are committed.
prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where ");
- suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- txnIds, "ctc_txnid", false, false);
- Long txnId;
- boolean isCommitted = false;
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+ txnIds, "ctc_txnid", false, false);
+ StringBuilder txnInfo = new StringBuilder();
+
for (String query : queries) {
LOG.debug("Going to execute query <" + query + ">");
- ResultSet rs = stmt.executeQuery(query);
- while (rs.next()) {
- isCommitted = true;
- txnId = rs.getLong(1);
- if (errorMsg != null) {
- errorMsg.append("{").append(txnId).append(",c}");
+ try (ResultSet rs = stmt.executeQuery(query)) {
+ while (rs.next()) {
+ long txnId = rs.getLong(1);
+ txnInfo.append("{").append(txnId).append(",c}");
}
}
}
- return isCommitted;
+ return txnInfo.toString();
}
/**
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index b5f2209..fc08dbc 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -23,10 +23,8 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
@@ -39,12 +37,13 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Rule;
import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
-import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
@@ -67,6 +66,9 @@ public class TestHiveMetaStoreTxns {
private IMetaStoreClient client;
private Connection conn;
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
@Test
public void testTxns() throws Exception {
List<Long> tids = client.openTxns("me", 3).getTxn_ids();
@@ -312,11 +314,20 @@ public class TestHiveMetaStoreTxns {
public void testTxnTypePersisted() throws Exception {
long txnId = client.openTxn("me", TxnType.READ_ONLY);
Statement stm = conn.createStatement();
- ResultSet rs = stm.executeQuery("SELECT txn_type FROM TXNS WHERE txn_id = " + txnId);
+ ResultSet rs = stm.executeQuery("SELECT txn_type FROM txns WHERE txn_id = " + txnId);
Assert.assertTrue(rs.next());
Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY);
}
+ @Test
+ public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception {
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("Write ID allocation failed on db.tbl as not all input txns in open state or read-only");
+
+ long txnId = client.openTxn("me", TxnType.READ_ONLY);
+ client.allocateTableWriteId(txnId, "db", "tbl");
+ }
+
@Before
public void setUp() throws Exception {
conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true);