You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2018/06/10 06:23:33 UTC

hive git commit: HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 95ea9f535 -> 2a55f99e5


HIVE-19382 : Acquire locks before generating valid transaction list for some operations (Jesus Camacho Rodriguez via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a55f99e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a55f99e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a55f99e

Branch: refs/heads/master
Commit: 2a55f99e5d17d9a4ea35f6f3b3372c224321f6b7
Parents: 95ea9f5
Author: Jesus Camacho Rodriguez <jc...@apache.org>
Authored: Wed May 2 08:52:00 2018 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Sat Jun 9 23:22:44 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hive/ql/Context.java |  43 ++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 202 ++++++++++++++++---
 .../apache/hadoop/hive/ql/exec/FetchTask.java   |   1 +
 3 files changed, 223 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 1921ea7..9eda4ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -291,6 +291,7 @@ public class Context {
   public DestClausePrefix addDestNamePrefix(int pos, DestClausePrefix prefix) {
     return insertBranchToNamePrefix.put(pos, prefix);
   }
+
   public Context(Configuration conf) throws IOException {
     this(conf, generateExecutionId());
   }
@@ -315,6 +316,48 @@ public class Context {
     viewsTokenRewriteStreams = new HashMap<>();
   }
 
+  protected Context(Context ctx) {
+    // This method creates a deep copy of context, but the copy is partial,
+    // hence it needs to be used carefully. In particular, following objects
+    // are ignored:
+    // opContext, pathToCS, cboInfo, cboSucceeded, tokenRewriteStream, viewsTokenRewriteStreams,
+    // rewrittenStatementContexts, cteTables, loadTableOutputMap, planMapper, insertBranchToNamePrefix
+    this.isHDFSCleanup = ctx.isHDFSCleanup;
+    this.resFile = ctx.resFile;
+    this.resDir = ctx.resDir;
+    this.resFs = ctx.resFs;
+    this.resDirPaths = ctx.resDirPaths;
+    this.resDirFilesNum = ctx.resDirFilesNum;
+    this.initialized = ctx.initialized;
+    this.originalTracker = ctx.originalTracker;
+    this.nonLocalScratchPath = ctx.nonLocalScratchPath;
+    this.localScratchDir = ctx.localScratchDir;
+    this.scratchDirPermission = ctx.scratchDirPermission;
+    this.fsScratchDirs.putAll(ctx.fsScratchDirs);
+    this.conf = ctx.conf;
+    this.pathid = ctx.pathid;
+    this.explainConfig = ctx.explainConfig;
+    this.cmd = ctx.cmd;
+    this.executionId = ctx.executionId;
+    this.hiveLocks = ctx.hiveLocks;
+    this.hiveTxnManager = ctx.hiveTxnManager;
+    this.needLockMgr = ctx.needLockMgr;
+    this.sequencer = ctx.sequencer;
+    this.outputLockObjects.putAll(ctx.outputLockObjects);
+    this.stagingDir = ctx.stagingDir;
+    this.heartbeater = ctx.heartbeater;
+    this.skipTableMasking = ctx.skipTableMasking;
+    this.isUpdateDeleteMerge = ctx.isUpdateDeleteMerge;
+    this.isLoadingMaterializedView = ctx.isLoadingMaterializedView;
+    this.operation = ctx.operation;
+    this.wmContext = ctx.wmContext;
+    this.isExplainPlan = ctx.isExplainPlan;
+    this.statsSource = ctx.statsSource;
+    this.executionIndex = ctx.executionIndex;
+    this.viewsTokenRewriteStreams = new HashMap<>();
+    this.rewrittenStatementContexts = new HashSet<>();
+  }
+
   public Map<String, Path> getFsScratchDirs() {
     return fsScratchDirs;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index a3dcc3b..c9d18a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -39,15 +39,16 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
-import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -60,11 +61,11 @@ import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
@@ -92,6 +93,7 @@ import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
+import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
@@ -141,6 +143,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hive.common.util.ShutdownHookManager;
+import org.apache.hive.common.util.TxnIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,6 +218,9 @@ public class Driver implements IDriver {
   private CacheEntry usedCacheEntry;
   private ValidWriteIdList compactionWriteIds = null;
 
+  private Context backupContext = null;
+  private boolean retrial = false;
+
   private enum DriverState {
     INITIALIZED,
     COMPILING,
@@ -624,10 +630,11 @@ public class Driver implements IDriver {
       // because at that point we need access to the objects.
       Hive.get().getMSC().flushCache();
 
-      BaseSemanticAnalyzer sem;
-      // Do semantic analysis and plan generation
-      if (hookRunner.hasPreAnalyzeHooks()) {
-        HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
+      backupContext = new Context(ctx);
+      boolean executeHooks = hookRunner.hasPreAnalyzeHooks();
+
+      HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
+      if (executeHooks) {
         hookCtx.setConf(conf);
         hookCtx.setUserName(userName);
         hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
@@ -635,24 +642,24 @@ public class Driver implements IDriver {
         hookCtx.setHiveOperation(queryState.getHiveOperation());
 
         tree =  hookRunner.runPreAnalyzeHooks(hookCtx, tree);
-        sem = SemanticAnalyzerFactory.get(queryState, tree);
+      }
+
+      // Do semantic analysis and plan generation
+      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
+
+      if (!retrial) {
         openTransaction();
-        // TODO: Lock acquisition should be moved before this method call
-        // when we want to implement lock-based concurrency control
         generateValidTxnList();
-        sem.analyze(tree, ctx);
-        hookCtx.update(sem);
+      }
+
+      sem.analyze(tree, ctx);
 
+      if (executeHooks) {
+        hookCtx.update(sem);
         hookRunner.runPostAnalyzeHooks(hookCtx, sem.getAllRootTasks());
-      } else {
-        sem = SemanticAnalyzerFactory.get(queryState, tree);
-        openTransaction();
-        // TODO: Lock acquisition should be moved before this method call
-        // when we want to implement lock-based concurrency control
-        generateValidTxnList();
-        sem.analyze(tree, ctx);
       }
-      LOG.info("Semantic Analysis Completed");
+
+      LOG.info("Semantic Analysis Completed (retrial = {})", retrial);
 
       // Retrieve information about cache usage for the query.
       if (conf.getBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED)) {
@@ -668,7 +675,7 @@ public class Driver implements IDriver {
       // get the output schema
       schema = getSchema(sem, conf);
       plan = new QueryPlan(queryStr, sem, queryDisplay.getQueryStartTime(), queryId,
-        queryState.getHiveOperation(), schema);
+          queryState.getHiveOperation(), schema);
 
 
       conf.set("mapreduce.workflow.id", "hive_" + queryId);
@@ -776,6 +783,86 @@ public class Driver implements IDriver {
     }
   }
 
+  // Checks whether txn list has been invalidated while planning the query.
+  // This would happen if query requires exclusive/semi-shared lock, and there
+  // has been a committed transaction on the table over which the lock is
+  // required.
+  private boolean isValidTxnListState() throws LockException {
+    // 1) Get valid txn list.
+    String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if (txnString == null) {
+      // Not a transactional op, nothing more to do
+      return true;
+    }
+    ValidTxnList currentTxnList = queryTxnMgr.getValidTxns();
+    String currentTxnString = currentTxnList.toString();
+    if (currentTxnString.equals(txnString)) {
+      // Still valid, nothing more to do
+      return true;
+    }
+    // 2) Get locks that are relevant:
+    // - Exclusive for INSERT OVERWRITE.
+    // - Semi-shared for UPDATE/DELETE.
+    if (ctx.getHiveLocks() == null || ctx.getHiveLocks().isEmpty()) {
+      // Nothing to check
+      return true;
+    }
+    Set<String> nonSharedLocks = new HashSet<>();
+    for (HiveLock lock : ctx.getHiveLocks()) {
+      if (lock.getHiveLockMode() == HiveLockMode.EXCLUSIVE ||
+          lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) {
+        if (lock.getHiveLockObject().getPaths().length == 2) {
+          // Pos 0 of lock paths array contains dbname, pos 1 contains tblname
+          nonSharedLocks.add(
+              Warehouse.getQualifiedName(
+                  lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
+        }
+      }
+    }
+    // 3) Get txn tables that are being written
+    ValidTxnWriteIdList txnWriteIdList =
+        new ValidTxnWriteIdList(conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
+    if (txnWriteIdList == null) {
+      // Nothing to check
+      return true;
+    }
+    List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
+    ValidTxnWriteIdList currentTxnWriteIds =
+        queryTxnMgr.getValidWriteIds(
+            writtenTables.stream()
+                .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
+                .map(e -> e.getLeft())
+                .collect(Collectors.toList()),
+            currentTxnString);
+    for (Pair<String, Table> tableInfo : writtenTables) {
+      String fullQNameForLock = Warehouse.getQualifiedName(
+          tableInfo.getRight().getDbName(),
+          MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
+      if (nonSharedLocks.contains(fullQNameForLock)) {
+        // Check if table is transactional
+        if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
+          // Check that write id is still valid
+          if (!TxnIdUtils.checkEquivalentWriteIds(
+              txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
+              currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
+            // Write id has changed, it is not valid anymore,
+            // we need to recompile
+            return false;
+          }
+        }
+        nonSharedLocks.remove(fullQNameForLock);
+      }
+    }
+    if (!nonSharedLocks.isEmpty()) {
+      throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
+          " been visited when trying to validate the locks from query tables.\n" +
+          "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
+          "Remaining locks after check: " + nonSharedLocks);
+    }
+    // It passes the test, it is valid
+    return true;
+  }
+
   private void setTriggerContext(final String queryId) {
     final long queryStartTime;
     // query info is created by SQLOperation which will have start time of the operation. When JDBC Statement is not
@@ -1394,6 +1481,34 @@ public class Driver implements IDriver {
     tableList.add(fullTableName);
   }
 
+  // Make the list of transactional tables list which are getting written by current txn
+  private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
+    List<Pair<String, Table>> result = new ArrayList<>();
+    Set<String> tableList = new HashSet<>();
+    for (WriteEntity output : plan.getOutputs()) {
+      Table tbl;
+      switch (output.getType()) {
+        case TABLE: {
+          tbl = output.getTable();
+          break;
+        }
+        case PARTITION:
+        case DUMMYPARTITION: {
+          tbl = output.getPartition().getTable();
+          break;
+        }
+        default: {
+          continue;
+        }
+      }
+      String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
+      if (tableList.add(fullTableName)) {
+        result.add(new ImmutablePair(fullTableName, tbl));
+      }
+    }
+    return result;
+  }
+
   private String getUserFromUGI() {
     // Don't use the userName member, as it may or may not have been set.  Get the value from
     // conf, which calls into getUGI to figure out who the process is running as.
@@ -1464,7 +1579,6 @@ public class Driver implements IDriver {
         acidDdlDesc.setWriteId(writeId);
       }
 
-
       /*It's imperative that {@code acquireLocks()} is called for all commands so that
       HiveTxnManager can transition its state machine correctly*/
       queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState);
@@ -1823,6 +1937,48 @@ public class Driver implements IDriver {
       lockAndRespond();
 
       try {
+        if (!isValidTxnListState()) {
+          // Snapshot was outdated when locks were acquired, hence regenerate context,
+          // txn list and retry
+          // TODO: Lock acquisition should be moved before analyze, this is a bit hackish.
+          // Currently, we acquire a snapshot, we compile the query wrt that snapshot,
+          // and then, we acquire locks. If snapshot is still valid, we continue as usual.
+          // But if snapshot is not valid, we recompile the query.
+          retrial = true;
+          backupContext.addRewrittenStatementContext(ctx);
+          backupContext.setHiveLocks(ctx.getHiveLocks());
+          ctx = backupContext;
+          conf.set(ValidTxnList.VALID_TXNS_KEY, queryTxnMgr.getValidTxns().toString());
+          if (plan.hasAcidResourcesInQuery()) {
+            recordValidWriteIds(queryTxnMgr);
+          }
+
+          if (!alreadyCompiled) {
+            // compile internal will automatically reset the perf logger
+            compileInternal(command, true);
+          } else {
+            // Since we're reusing the compiled plan, we need to update its start time for current run
+            plan.setQueryStartTime(queryDisplay.getQueryStartTime());
+          }
+
+          if (!isValidTxnListState()) {
+            // Throw exception
+            throw handleHiveException(new HiveException("Operation could not be executed"), 14);
+          }
+
+          //Reset the PerfLogger
+          perfLogger = SessionState.getPerfLogger(true);
+
+          // the reason that we set the txn manager for the cxt here is because each
+          // query has its own ctx object. The txn mgr is shared across the
+          // same instance of Driver, which can run multiple queries.
+          ctx.setHiveTxnManager(queryTxnMgr);
+        }
+      } catch (LockException e) {
+        throw handleHiveException(e, 13);
+      }
+
+      try {
         execute();
       } catch (CommandProcessorResponse cpr) {
         rollback(cpr);

http://git-wip-us.apache.org/repos/asf/hive/blob/2a55f99e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index e555aec..caa9d83 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -59,6 +59,7 @@ public class FetchTask extends Task<FetchWork> implements Serializable {
   public void setValidWriteIdList(String writeIdStr) {
     fetch.setValidWriteIdList(writeIdStr);
   }
+
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx,
       CompilationOpContext opContext) {