You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/06/12 00:15:25 UTC

hive git commit: Revert "HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)"

Repository: hive
Updated Branches:
  refs/heads/branch-3 81a4bdd75 -> c085aaa58


Revert "HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)"

This reverts commit 86aeb53645d12a78a4ddcbe0df2205115e6bf4f4.


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c085aaa5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c085aaa5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c085aaa5

Branch: refs/heads/branch-3
Commit: c085aaa58b565873d40e547936aae6618c89ba91
Parents: 81a4bdd
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Mon Jun 11 17:15:10 2018 -0700
Committer: Jesus Camacho Rodriguez <jc...@apache.org>
Committed: Mon Jun 11 17:15:10 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Context.java |  43 ----
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 201 +++----------------
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   1 -
 3 files changed, 23 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 9eda4ed..1921ea7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -291,7 +291,6 @@ public class Context {
   public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) {
     return insertBranchToNamePrefix.put(pos, prefix);
   }
-
   public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
   }
@@ -316,48 +315,6 @@ public class Context {
     viewsTokenRewriteStreams = new HashMap<>();
   }
 
-  protected Context(Context ctx) {
-    // This method creates a deep copy of context, but the copy is partial,
-    // hence it needs to be used carefully. In particular, following objects
-    // are ignored:
-    // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams,
-    // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix
-    this.isHDFSCleanup = ctx.isHDFSCleanup;
-    this.resFile = ctx.resFile;
-    this.resDir = ctx.resDir;
-    this.resFs = ctx.resFs;
-    this.resDirPaths = ctx.resDirPaths;
-    this.resDirFilesNum = ctx.resDirFilesNum;
-    this.initialized = ctx.initialized;
-    this.originalTracker = ctx.originalTracker;
-    this.nonLocalScratchPath = ctx.nonLocalScratchPath;
-    this.localScratchDir = ctx.localScratchDir;
-    this.scratchDirPermission = ctx.scratchDirPermission;
-    this.fsScratchDirs.putAll(ctx.fsScratchDirs);
-    this.conf = ctx.conf;
-    this.pathid = ctx.pathid;
-    this.explainConfig = ctx.explainConfig;
-    this.cmd = ctx.cmd;
-    this.executionId = ctx.executionId;
-    this.hiveLocks = ctx.hiveLocks;
-    this.hiveTxnManager = ctx.hiveTxnManager;
-    this.needLockMgr = ctx.needLockMgr;
-    this.sequencer = ctx.sequencer;
-    this.outputLockObjects.putAll(ctx.outputLockObjects);
-    this.stagingDir = ctx.stagingDir;
-    this.heartbeater = ctx.heartbeater;
-    this.skipTableMasking = ctx.skipTableMasking;
-    this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge;
-    this.isLoadingMaterializedView = ctx.isLoadingMaterializedView;
-    this.operation = ctx.operation;
-    this.wmContext = ctx.wmContext;
-    this.isExplainPlan = ctx.isExplainPlan;
-    this.statsSource = ctx.statsSource;
-    this.executionIndex = ctx.executionIndex;
-    this.viewsTokenRewriteStreams = new HashMap<>();
-    this.rewrittenStatementContexts = new HashSet<>();
-  }
-
   public Map<String, Path> getFsScratchDirs() {
     return fsScratchDirs;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 8b5262a..08f9a67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,16 +39,15 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -61,11 +60,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
-import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -143,7 +141,6 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hive.common.util.ShutdownHookManager;
-import org.apache.hive.common.util.TxnIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -218,9 +215,6 @@ public class Driver implements IDriver {
   private CacheEntry usedCacheEntry;
   private ValidWriteIdList compactionWriteIds = null;
 
-  private Context backupContext = null;
-  private boolean retrial = false;
-
   private enum DriverState {
     INITIALIZED,
     COMPILING,
@@ -631,11 +625,10 @@ public class Driver implements IDriver {
       // because at that point we need access to the objects.
       Hive.get().getMSC().flushCache();
 
-      backupContext = new Context(ctx);
-      boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
-
-      HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
-      if (executeHooks) {
+      BaseSemanticAnalyzer sem;
+      // Do semantic analysis and plan generation
+      if (hookRunner.hasPreAnalyzeHooks()) {
+        HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
         hookCtx.setConf(conf);
         hookCtx.setUserName(userName);
         hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
@@ -643,24 +636,24 @@ public class Driver implements IDriver {
         hookCtx.setHiveOperation(queryState.getHiveOperation());
 
         tree =  hookRunner.runPreAnalyzeHooks(hookCtx, tree);
-      }
-
-      // Do semantic analysis and plan generation
-      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
-
-      if (!retrial) {
+        sem = SemanticAnalyzerFactory.get(queryState, tree);
         openTransaction();
+        // TODO: Lock acquisition should be moved before this method call
+        // when we want to implement lock-based concurrency control
         generateValidTxnList();
-      }
-
-      sem.analyze(tree, ctx);
-
-      if (executeHooks) {
+        sem.analyze(tree, ctx);
         hookCtx.update(sem);
+
         hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
+      } else {
+        sem = SemanticAnalyzerFactory.get(queryState, tree);
+        openTransaction();
+        // TODO: Lock acquisition should be moved before this method call
+        // when we want to implement lock-based concurrency control
+        generateValidTxnList();
+        sem.analyze(tree, ctx);
       }
-
-      LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
+      LOG.info("Semantic Analysis Completed");
 
       // Retrieve information about cache usage for the query.
       if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
@@ -678,6 +671,7 @@ public class Driver implements IDriver {
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
         queryState.getHiveOperation(), schema);
 
+
       conf.set("mapreduce.workflow.id", "hive_" + queryId);
       conf.set("mapreduce.workflow.name", queryStr);
 
@@ -780,86 +774,6 @@ public class Driver implements IDriver {
     }
   }
 
-  // Checks whether txn list has been invalidated while planning the query.
-  // This would happen if query requires exclusive/semi-shared lock, and there
-  // has been a committed transaction on the table over which the lock is
-  // required.
-  private boolean isValidTxnListState() throws LockException {
-    // 1) Get valid txn list.
-    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
-    if (txnString == null) {
-      // Not a transactional op, nothing more to do
-      return true;
-    }
-    ValidTxnList currentTxnList = queryTxnMgr.getValidTxns();
-    String currentTxnString = currentTxnList.toString();
-    if (currentTxnString.equals(txnString)) {
-      // Still valid, nothing more to do
-      return true;
-    }
-    // 2) Get locks that are relevant:
-    // - Exclusive for INSERT OVERWRITE.
-    // - Semi-shared for UPDATE/DELETE.
-    if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
-      // Nothing to check
-      return true;
-    }
-    Set<String> nonSharedLocks = new HashSet<>();
-    for (HiveLock lock : ctx.getHiveLocks()) {
-      if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE ||
-          lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) {
-        if (lock.getHiveLockObject().getPaths().length == 2) {
-          // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
-          nonSharedLocks.add(
-              Warehouse.getQualifiedName(
-                  lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
-        }
-      }
-    }
-    // 3) Get txn tables that are being written
-    ValidTxnWriteIdList txnWriteIdList =
-        new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
-    if (txnWriteIdList == null) {
-      // Nothing to check
-      return true;
-    }
-    List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
-    ValidTxnWriteIdList currentTxnWriteIds =
-        queryTxnMgr.getValidWriteIds(
-            writtenTables.stream()
-                .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
-                .map(e -> e.getLeft())
-                .collect(Collectors.toList()),
-            currentTxnString);
-    for (Pair<String, Table> tableInfo : writtenTables) {
-      String fullQNameForLock = Warehouse.getQualifiedName(
-          tableInfo.getRight().getDbName(),
-          MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
-      if (nonSharedLocks.contains(fullQNameForLock)) {
-        // Check if table is transactional
-        if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
-          // Check that write id is still valid
-          if (!TxnIdUtils.checkEquivalentWriteIds(
-              txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
-              currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
-            // Write id has changed, it is not valid anymore,
-            // we need to recompile
-            return false;
-          }
-        }
-        nonSharedLocks.remove(fullQNameForLock);
-      }
-    }
-    if (!nonSharedLocks.isEmpty()) {
-      throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
-          " been visited when trying to validate the locks from query tables.\n" +
-          "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
-          "Remaining locks after check: " + nonSharedLocks);
-    }
-    // It passes the test, it is valid
-    return true;
-  }
-
   private void setTriggerContext(final String queryId) {
     final long queryStartTime;
     // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not
@@ -1478,34 +1392,6 @@ public class Driver implements IDriver {
     tableList.add(fullTableName);
   }
 
-  // Make the list of transactional tables list which are getting written by current txn
-  private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
-    List<Pair<String, Table>> result = new ArrayList<>();
-    Set<String> tableList = new HashSet<>();
-    for (WriteEntity output : plan.getOutputs()) {
-      Table tbl;
-      switch (output.getType()) {
-        case TABLE: {
-          tbl = output.getTable();
-          break;
-        }
-        case PARTITION:
-        case DUMMYPARTITION: {
-          tbl = output.getPartition().getTable();
-          break;
-        }
-        default: {
-          continue;
-        }
-      }
-      String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
-      if (tableList.add(fullTableName)) {
-        result.add(new ImmutablePair(fullTableName, tbl));
-      }
-    }
-    return result;
-  }
-
   private String getUserFromUGI() {
     // Don't use the userName member, as it may or may not have been set.  Get the value from
     // conf, which calls into getUGI to figure out who the process is running as.
@@ -1576,6 +1462,7 @@ public class Driver implements IDriver {
         acidDdlDesc.setWriteId(writeId);
       }
 
+
       /*It's imperative that {@code acquireLocks()} is called for all commands so that
       HiveTxnManager can transition its state machine correctly*/
       queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1936,48 +1823,6 @@ public class Driver implements IDriver {
       lockAndRespond();
 
       try {
-        if (!isValidTxnListState()) {
-          // Snapshot was outdated when locks were acquired, hence regenerate context,
-          // txn list and retry
-          // TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
-          // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
-          // and then, we acquire locks. If snapshot is still valid, we continue as usual.
-          // But if snapshot is not valid, we recompile the query.
-          retrial = true;
-          backupContext.addRewrittenStatementContext(ctx);
-          backupContext.setHiveLocks(ctx.getHiveLocks());
-          ctx = backupContext;
-          conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
-          if (plan.hasAcidResourcesInQuery()) {
-            recordValidWriteIds(queryTxnMgr);
-          }
-
-          if (!alreadyCompiled) {
-            // compile internal will automatically reset the perf logger
-            compileInternal(command, true);
-          } else {
-            // Since we're reusing the compiled plan, we need to update its start time for current run
-            plan.setQueryStartTime(queryDisplay.getQueryStartTime());
-          }
-
-          if (!isValidTxnListState()) {
-            // Throw exception
-            throw handleHiveException(new HiveException("Operation could not be executed"), 14);
-          }
-
-          //Reset the PerfLogger
-          perfLogger = SessionState.getPerfLogger(true);
-
-          // the reason that we set the txn manager for the cxt here is because each
-          // query has its own ctx object. The txn mgr is shared across the
-          // same instance of Driver, which can run multiple queries.
-          ctx.setHiveTxnManager(queryTxnMgr);
-        }
-      } catch (LockException e) {
-        throw handleHiveException(e, 13);
-      }
-
-      try {
         execute();
       } catch (CommandProcessorResponse cpr) {
         rollback(cpr);

http://git-wip-us.apache.org/repos/asf/hive/blob/c085aaa5/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index caa9d83..e555aec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -59,7 +59,6 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
   public void setValidWriteIdList(String writeIdStr) {
     fetch.setValidWriteIdList(writeIdStr);
   }
-
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
       CompilationOpContext opContext) {