You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/06/12 00:15:25 UTC
hive git commit: Revert "HIVE-19382 : Acquire locks before generating
valid transaction list for some operations (Jesus Camacho Rodriguez via
Ashutosh Chauhan)"
Repository: hive
Updated Branches:
refs/heads/branch-3 81a4bdd75 -> c085aaa58
Revert "HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)"
This reverts commit 86aeb53645d12a78a4ddcbe0df2205115e6bf4f4.
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c085aaa5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c085aaa5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c085aaa5
Branch: refs/heads/branch-3
Commit: c085aaa58b565873d40e547936aae6618c89ba91
Parents: 81a4bdd
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Jun 11 17:15:10 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon Jun 11 17:15:10 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/ql/Context.java | 43 ----
.../java/org/apache/hadoop/hive/ql/Driver.java | 201 +++----------------
.../apache/hadoop/hive/ql/exec/FetchTask.java | 1 -
3 files changed, 23 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9eda4ed..1921ea7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -291,7 +291,6 @@ public class Context {
public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) {
return insertBranchToNamePrefix.put(pos, prefix);
}
-
public Context(Configuration conf) throws IOException {
this(conf, generateExecutionId());
}
@@ -316,48 +315,6 @@ public class Context {
viewsTokenRewriteStreams = new HashMap<>();
}
- protected Context(Context ctx) {
- // This method creates a deep copy of context, but the copy is partial,
- // hence it needs to be used carefully. In particular, following objects
- // are ignored:
- // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams,
- // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix
- this.isHDFSCleanup = ctx.isHDFSCleanup;
- this.resFile = ctx.resFile;
- this.resDir = ctx.resDir;
- this.resFs = ctx.resFs;
- this.resDirPaths = ctx.resDirPaths;
- this.resDirFilesNum = ctx.resDirFilesNum;
- this.initialized = ctx.initialized;
- this.originalTracker = ctx.originalTracker;
- this.nonLocalScratchPath = ctx.nonLocalScratchPath;
- this.localScratchDir = ctx.localScratchDir;
- this.scratchDirPermission = ctx.scratchDirPermission;
- this.fsScratchDirs.putAll(ctx.fsScratchDirs);
- this.conf = ctx.conf;
- this.pathid = ctx.pathid;
- this.explainConfig = ctx.explainConfig;
- this.cmd = ctx.cmd;
- this.executionId = ctx.executionId;
- this.hiveLocks = ctx.hiveLocks;
- this.hiveTxnManager = ctx.hiveTxnManager;
- this.needLockMgr = ctx.needLockMgr;
- this.sequencer = ctx.sequencer;
- this.outputLockObjects.putAll(ctx.outputLockObjects);
- this.stagingDir = ctx.stagingDir;
- this.heartbeater = ctx.heartbeater;
- this.skipTableMasking = ctx.skipTableMasking;
- this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge;
- this.isLoadingMaterializedView = ctx.isLoadingMaterializedView;
- this.operation = ctx.operation;
- this.wmContext = ctx.wmContext;
- this.isExplainPlan = ctx.isExplainPlan;
- this.statsSource = ctx.statsSource;
- this.executionIndex = ctx.executionIndex;
- this.viewsTokenRewriteStreams = new HashMap<>();
- this.rewrittenStatementContexts = new HashSet<>();
- }
-
public Map<String, Path> getFsScratchDirs() {
return fsScratchDirs;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 8b5262a..08f9a67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,16 +39,15 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -61,11 +60,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -143,7 +141,6 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.hive.common.util.TxnIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -218,9 +215,6 @@ public class Driver implements IDriver {
private CacheEntry usedCacheEntry;
private ValidWriteIdList compactionWriteIds = null;
- private Context backupContext = null;
- private boolean retrial = false;
-
private enum DriverState {
INITIALIZED,
COMPILING,
@@ -631,11 +625,10 @@ public class Driver implements IDriver {
// because at that point we need access to the objects.
Hive.get().getMSC().flushCache();
- backupContext = new Context(ctx);
- boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
-
- HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
- if (executeHooks) {
+ BaseSemanticAnalyzer sem;
+ // Do semantic analysis and plan generation
+ if (hookRunner.hasPreAnalyzeHooks()) {
+ HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
hookCtx.setUserName(userName);
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
@@ -643,24 +636,24 @@ public class Driver implements IDriver {
hookCtx.setHiveOperation(queryState.getHiveOperation());
tree = hookRunner.runPreAnalyzeHooks(hookCtx, tree);
- }
-
- // Do semantic analysis and plan generation
- BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
-
- if (!retrial) {
+ sem = SemanticAnalyzerFactory.get(queryState, tree);
openTransaction();
+ // TODO: Lock acquisition should be moved before this method call
+ // when we want to implement lock-based concurrency control
generateValidTxnList();
- }
-
- sem.analyze(tree, ctx);
-
- if (executeHooks) {
+ sem.analyze(tree, ctx);
hookCtx.update(sem);
+
hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
+ } else {
+ sem = SemanticAnalyzerFactory.get(queryState, tree);
+ openTransaction();
+ // TODO: Lock acquisition should be moved before this method call
+ // when we want to implement lock-based concurrency control
+ generateValidTxnList();
+ sem.analyze(tree, ctx);
}
-
- LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
+ LOG.info("Semantic Analysis Completed");
// Retrieve information about cache usage for the query.
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
@@ -678,6 +671,7 @@ public class Driver implements IDriver {
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
queryState.getHiveOperation(), schema);
+
conf.set("mapreduce.workflow.id", "hive_" + queryId);
conf.set("mapreduce.workflow.name", queryStr);
@@ -780,86 +774,6 @@ public class Driver implements IDriver {
}
}
- // Checks whether txn list has been invalidated while planning the query.
- // This would happen if query requires exclusive/semi-shared lock, and there
- // has been a committed transaction on the table over which the lock is
- // required.
- private boolean isValidTxnListState() throws LockException {
- // 1) Get valid txn list.
- String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
- if (txnString == null) {
- // Not a transactional op, nothing more to do
- return true;
- }
- ValidTxnList currentTxnList = queryTxnMgr.getValidTxns();
- String currentTxnString = currentTxnList.toString();
- if (currentTxnString.equals(txnString)) {
- // Still valid, nothing more to do
- return true;
- }
- // 2) Get locks that are relevant:
- // - Exclusive for INSERT OVERWRITE.
- // - Semi-shared for UPDATE/DELETE.
- if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
- // Nothing to check
- return true;
- }
- Set<String> nonSharedLocks = new HashSet<>();
- for (HiveLock lock : ctx.getHiveLocks()) {
- if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE ||
- lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) {
- if (lock.getHiveLockObject().getPaths().length == 2) {
- // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
- nonSharedLocks.add(
- Warehouse.getQualifiedName(
- lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
- }
- }
- }
- // 3) Get txn tables that are being written
- ValidTxnWriteIdList txnWriteIdList =
- new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
- if (txnWriteIdList == null) {
- // Nothing to check
- return true;
- }
- List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
- ValidTxnWriteIdList currentTxnWriteIds =
- queryTxnMgr.getValidWriteIds(
- writtenTables.stream()
- .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
- .map(e -> e.getLeft())
- .collect(Collectors.toList()),
- currentTxnString);
- for (Pair<String, Table> tableInfo : writtenTables) {
- String fullQNameForLock = Warehouse.getQualifiedName(
- tableInfo.getRight().getDbName(),
- MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
- if (nonSharedLocks.contains(fullQNameForLock)) {
- // Check if table is transactional
- if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
- // Check that write id is still valid
- if (!TxnIdUtils.checkEquivalentWriteIds(
- txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
- currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
- // Write id has changed, it is not valid anymore,
- // we need to recompile
- return false;
- }
- }
- nonSharedLocks.remove(fullQNameForLock);
- }
- }
- if (!nonSharedLocks.isEmpty()) {
- throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
- " been visited when trying to validate the locks from query tables.\n" +
- "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
- "Remaining locks after check: " + nonSharedLocks);
- }
- // It passes the test, it is valid
- return true;
- }
-
private void setTriggerContext(final String queryId) {
final long queryStartTime;
// query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not
@@ -1478,34 +1392,6 @@ public class Driver implements IDriver {
tableList.add(fullTableName);
}
- // Make the list of transactional tables list which are getting written by current txn
- private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
- List<Pair<String, Table>> result = new ArrayList<>();
- Set<String> tableList = new HashSet<>();
- for (WriteEntity output : plan.getOutputs()) {
- Table tbl;
- switch (output.getType()) {
- case TABLE: {
- tbl = output.getTable();
- break;
- }
- case PARTITION:
- case DUMMYPARTITION: {
- tbl = output.getPartition().getTable();
- break;
- }
- default: {
- continue;
- }
- }
- String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
- if (tableList.add(fullTableName)) {
- result.add(new ImmutablePair(fullTableName, tbl));
- }
- }
- return result;
- }
-
private String getUserFromUGI() {
// Don't use the userName member, as it may or may not have been set. Get the value from
// conf, which calls into getUGI to figure out who the process is running as.
@@ -1576,6 +1462,7 @@ public class Driver implements IDriver {
acidDdlDesc.setWriteId(writeId);
}
+
/*It's imperative that {@code acquireLocks()} is called for all commands so that
HiveTxnManager can transition its state machine correctly*/
queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1936,48 +1823,6 @@ public class Driver implements IDriver {
lockAndRespond();
try {
- if (!isValidTxnListState()) {
- // Snapshot was outdated when locks were acquired, hence regenerate context,
- // txn list and retry
- // TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
- // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
- // and then, we acquire locks. If snapshot is still valid, we continue as usual.
- // But if snapshot is not valid, we recompile the query.
- retrial = true;
- backupContext.addRewrittenStatementContext(ctx);
- backupContext.setHiveLocks(ctx.getHiveLocks());
- ctx = backupContext;
- conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
- if (plan.hasAcidResourcesInQuery()) {
- recordValidWriteIds(queryTxnMgr);
- }
-
- if (!alreadyCompiled) {
- // compile internal will automatically reset the perf logger
- compileInternal(command, true);
- } else {
- // Since we're reusing the compiled plan, we need to update its start time for current run
- plan.setQueryStartTime(queryDisplay.getQueryStartTime());
- }
-
- if (!isValidTxnListState()) {
- // Throw exception
- throw handleHiveException(new HiveException("Operation could not be executed"), 14);
- }
-
- //Reset the PerfLogger
- perfLogger = SessionState.getPerfLogger(true);
-
- // the reason that we set the txn manager for the cxt here is because each
- // query has its own ctx object. The txn mgr is shared across the
- // same instance of Driver, which can run multiple queries.
- ctx.setHiveTxnManager(queryTxnMgr);
- }
- } catch (LockException e) {
- throw handleHiveException(e, 13);
- }
-
- try {
execute();
} catch (CommandProcessorResponse cpr) {
rollback(cpr);
http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index caa9d83..e555aec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -59,7 +59,6 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
public void setValidWriteIdList(String writeIdStr) {
fetch.setValidWriteIdList(writeIdStr);
}
-
@Override
public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
CompilationOpContext opContext) {