You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/10 06:23:33 UTC
hive git commit: HIVE-19382 : Acquire locks before generating valid
transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh
Chauhan)
Repository: hive
Updated Branches:
refs/heads/master 95ea9f535 -> 2a55f99e5
HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a55f99e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a55f99e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a55f99e
Branch: refs/heads/master
Commit: 2a55f99e5d17d9a4ea35f6f3b3372c224321f6b7
Parents: 95ea9f5
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 2 08:52:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Jun 9 23:22:44 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Context.java | 43 ++++
.../java/org/apache/hadoop/hive/ql/Driver.java | 202 ++++++++++++++++---
.../apache/hadoop/hive/ql/exec/FetchTask.java | 1 +
3 files changed, 223 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 1921ea7..9eda4ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -291,6 +291,7 @@ public class Context {
public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) {
return insertBranchToNamePrefix.put(pos, prefix);
}
+
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
@@ -315,6 +316,48 @@ public class Context {
viewsTokenRewriteStreams = new HashMap<>();
}
+ protected Context(Context ctx) {
+ // This method creates a deep copy of context, but the copy is partial,
+ // hence it needs to be used carefully. In particular, following objects
+ // are ignored:
+ // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams,
+ // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix
+ this.isHDFSCleanup = ctx.isHDFSCleanup;
+ this.resFile = ctx.resFile;
+ this.resDir = ctx.resDir;
+ this.resFs = ctx.resFs;
+ this.resDirPaths = ctx.resDirPaths;
+ this.resDirFilesNum = ctx.resDirFilesNum;
+ this.initialized = ctx.initialized;
+ this.originalTracker = ctx.originalTracker;
+ this.nonLocalScratchPath = ctx.nonLocalScratchPath;
+ this.localScratchDir = ctx.localScratchDir;
+ this.scratchDirPermission = ctx.scratchDirPermission;
+ this.fsScratchDirs.putAll(ctx.fsScratchDirs);
+ this.conf = ctx.conf;
+ this.pathid = ctx.pathid;
+ this.explainConfig = ctx.explainConfig;
+ this.cmd = ctx.cmd;
+ this.executionId = ctx.executionId;
+ this.hiveLocks = ctx.hiveLocks;
+ this.hiveTxnManager = ctx.hiveTxnManager;
+ this.needLockMgr = ctx.needLockMgr;
+ this.sequencer = ctx.sequencer;
+ this.outputLockObjects.putAll(ctx.outputLockObjects);
+ this.stagingDir = ctx.stagingDir;
+ this.heartbeater = ctx.heartbeater;
+ this.skipTableMasking = ctx.skipTableMasking;
+ this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge;
+ this.isLoadingMaterializedView = ctx.isLoadingMaterializedView;
+ this.operation = ctx.operation;
+ this.wmContext = ctx.wmContext;
+ this.isExplainPlan = ctx.isExplainPlan;
+ this.statsSource = ctx.statsSource;
+ this.executionIndex = ctx.executionIndex;
+ this.viewsTokenRewriteStreams = new HashMap<>();
+ this.rewrittenStatementContexts = new HashSet<>();
+ }
+
public Map<String, Path> getFsScratchDirs() {
return fsScratchDirs;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a3dcc3b..c9d18a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,15 +39,16 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -60,11 +61,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -141,6 +143,7 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.common.util.TxnIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -215,6 +218,9 @@ public class Driver implements IDriver {
private CacheEntry usedCacheEntry;
private ValidWriteIdList compactionWriteIds = null;
+ private Context backupContext = null;
+ private boolean retrial = false;
+
private enum DriverState {
INITIALIZED,
COMPILING,
@@ -624,10 +630,11 @@ public class Driver implements IDriver {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
- BaseSemanticAnalyzer sem;
- // Do semantic analysis and plan generation
- if (hookRunner.hasPreAnalyzeHooks()) {
- HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
+ backupContext = new Context(ctx);
+ boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
+
+ HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
+ if (executeHooks) {
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
@@ -635,24 +642,24 @@ public class Driver implements IDriver {
hookCtx.setHiveOperation(queryState.getHiveOperation());
tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree);
- sem = SemanticAnalyzerFactory.get(queryState, tree);
+ }
+
+ // Do semantic analysis and plan generation
+ BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
+
+ if (!retrial) {
openTransaction();
- // TODO: Lock acquisition should be moved before this method call
- // when we want to implement lock-based concurrency control
generateValidTxnList();
- sem.analyze(tree, ctx);
- hookCtx.update(sem);
+ }
+
+ sem.analyze(tree, ctx);
+ if (executeHooks) {
+ hookCtx.update(sem);
hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
- } else {
- sem = SemanticAnalyzerFactory.get(queryState, tree);
- openTransaction();
- // TODO: Lock acquisition should be moved before this method call
- // when we want to implement lock-based concurrency control
- generateValidTxnList();
- sem.analyze(tree, ctx);
}
- LOG.info("Semantic Analysis Completed");
+
+ LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
// Retrieve information about cache usage for the query.
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
@@ -668,7 +675,7 @@ public class Driver implements IDriver {
// get the output schema
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, queryDisplay.getQueryStartTime(), queryId,
- queryState.getHiveOperation(), schema);
+ queryState.getHiveOperation(), schema);
conf.set("mapreduce.workflow.id", "hive_" + queryId);
@@ -776,6 +783,86 @@ public class Driver implements IDriver {
}
}
+ // Checks whether txn list has been invalidated while planning the query.
+ // This would happen if query requires exclusive/semi-shared lock, and there
+ // has been a committed transaction on the table over which the lock is
+ // required.
+ private boolean isValidTxnListState() throws LockException {
+ // 1) Get valid txn list.
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+ if (txnString == null) {
+ // Not a transactional op, nothing more to do
+ return true;
+ }
+ ValidTxnList currentTxnList = queryTxnMgr.getValidTxns();
+ String currentTxnString = currentTxnList.toString();
+ if (currentTxnString.equals(txnString)) {
+ // Still valid, nothing more to do
+ return true;
+ }
+ // 2) Get locks that are relevant:
+ // - Exclusive for INSERT OVERWRITE.
+ // - Semi-shared for UPDATE/DELETE.
+ if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
+ // Nothing to check
+ return true;
+ }
+ Set<String> nonSharedLocks = new HashSet<>();
+ for (HiveLock lock : ctx.getHiveLocks()) {
+ if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE ||
+ lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) {
+ if (lock.getHiveLockObject().getPaths().length == 2) {
+ // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
+ nonSharedLocks.add(
+ Warehouse.getQualifiedName(
+ lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
+ }
+ }
+ }
+ // 3) Get txn tables that are being written
+ ValidTxnWriteIdList txnWriteIdList =
+ new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+ if (txnWriteIdList == null) {
+ // Nothing to check
+ return true;
+ }
+ List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
+ ValidTxnWriteIdList currentTxnWriteIds =
+ queryTxnMgr.getValidWriteIds(
+ writtenTables.stream()
+ .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
+ .map(e -> e.getLeft())
+ .collect(Collectors.toList()),
+ currentTxnString);
+ for (Pair<String, Table> tableInfo : writtenTables) {
+ String fullQNameForLock = Warehouse.getQualifiedName(
+ tableInfo.getRight().getDbName(),
+ MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
+ if (nonSharedLocks.contains(fullQNameForLock)) {
+ // Check if table is transactional
+ if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
+ // Check that write id is still valid
+ if (!TxnIdUtils.checkEquivalentWriteIds(
+ txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
+ currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
+ // Write id has changed, it is not valid anymore,
+ // we need to recompile
+ return false;
+ }
+ }
+ nonSharedLocks.remove(fullQNameForLock);
+ }
+ }
+ if (!nonSharedLocks.isEmpty()) {
+ throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
+ " been visited when trying to validate the locks from query tables.\n" +
+ "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
+ "Remaining locks after check: " + nonSharedLocks);
+ }
+ // It passes the test, it is valid
+ return true;
+ }
+
private void setTriggerContext(final String queryId) {
final long queryStartTime;
// query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not
@@ -1394,6 +1481,34 @@ public class Driver implements IDriver {
tableList.add(fullTableName);
}
+ // Make the list of transactional tables list which are getting written by current txn
+ private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
+ List<Pair<String, Table>> result = new ArrayList<>();
+ Set<String> tableList = new HashSet<>();
+ for (WriteEntity output : plan.getOutputs()) {
+ Table tbl;
+ switch (output.getType()) {
+ case TABLE: {
+ tbl = output.getTable();
+ break;
+ }
+ case PARTITION:
+ case DUMMYPARTITION: {
+ tbl = output.getPartition().getTable();
+ break;
+ }
+ default: {
+ continue;
+ }
+ }
+ String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
+ if (tableList.add(fullTableName)) {
+ result.add(new ImmutablePair(fullTableName, tbl));
+ }
+ }
+ return result;
+ }
+
private String getUserFromUGI() {
// Don't use the userName member, as it may or may not have been set. Get the value from
// conf, which calls into getUGI to figure out who the process is running as.
@@ -1464,7 +1579,6 @@ public class Driver implements IDriver {
acidDdlDesc.setWriteId(writeId);
}
-
/*It's imperative that {@code acquireLocks()} is called for all commands so that
HiveTxnManager can transition its state machine correctly*/
queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1823,6 +1937,48 @@ public class Driver implements IDriver {
lockAndRespond();
try {
+ if (!isValidTxnListState()) {
+ // Snapshot was outdated when locks were acquired, hence regenerate context,
+ // txn list and retry
+ // TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
+ // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
+ // and then, we acquire locks. If snapshot is still valid, we continue as usual.
+ // But if snapshot is not valid, we recompile the query.
+ retrial = true;
+ backupContext.addRewrittenStatementContext(ctx);
+ backupContext.setHiveLocks(ctx.getHiveLocks());
+ ctx = backupContext;
+ conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
+ if (plan.hasAcidResourcesInQuery()) {
+ recordValidWriteIds(queryTxnMgr);
+ }
+
+ if (!alreadyCompiled) {
+ // compile internal will automatically reset the perf logger
+ compileInternal(command, true);
+ } else {
+ // Since we're reusing the compiled plan, we need to update its start time for current run
+ plan.setQueryStartTime(queryDisplay.getQueryStartTime());
+ }
+
+ if (!isValidTxnListState()) {
+ // Throw exception
+ throw handleHiveException(new HiveException("Operation could not be executed"), 14);
+ }
+
+ //Reset the PerfLogger
+ perfLogger = SessionState.getPerfLogger(true);
+
+ // the reason that we set the txn manager for the cxt here is because each
+ // query has its own ctx object. The txn mgr is shared across the
+ // same instance of Driver, which can run multiple queries.
+ ctx.setHiveTxnManager(queryTxnMgr);
+ }
+ } catch (LockException e) {
+ throw handleHiveException(e, 13);
+ }
+
+ try {
execute();
} catch (CommandProcessorResponse cpr) {
rollback(cpr);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index e555aec..caa9d83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -59,6 +59,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
public void setValidWriteIdList(String writeIdStr) {
fetch.setValidWriteIdList(writeIdStr);
}
+
@Override
public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
CompilationOpContext opContext) {