You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/04 12:15:44 UTC

[hive] branch master updated: HIVE-21114: Create read-only transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cdad75a  HIVE-21114: Create read-only transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)
cdad75a is described below

commit cdad75aaf0e43d01c053e85817785f415884e608
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Nov 4 12:59:01 2019 +0100

    HIVE-21114: Create read-only transactions (Denys Kuzmenko, reviewed by Ashutosh Bapat and Peter Vary)
---
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  | 122 +++++++++----------
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |  22 +++-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |  12 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java    |   7 ++
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java     |  11 ++
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager.java   |  12 +-
 .../hadoop/hive/ql/parse/TestParseUtils.java       |  96 +++++++++++++++
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 135 ++++++++++-----------
 .../hive/metastore/TestHiveMetaStoreTxns.java      |  19 ++-
 9 files changed, 283 insertions(+), 153 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 91910d1..bae0ffd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -22,10 +22,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.PrintStream;
-import java.io.Serializable;
 import java.net.InetAddress;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -37,11 +35,10 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -53,11 +50,11 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.conf.HiveVariableSource;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
@@ -82,7 +79,6 @@ import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.HookContext;
 import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.ql.hooks.PrivateHookContext;
-import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lock.CompileLock;
@@ -185,6 +181,7 @@ public class Driver implements IDriver {
   // Transaction manager used for the query. This will be set at compile time based on
   // either initTxnMgr or from the SessionState, in that order.
   private HiveTxnManager queryTxnMgr;
+  private TxnType queryTxnType = TxnType.DEFAULT;
   private StatsSource statsSource;
 
   // Boolean to store information about whether valid txn list was generated
@@ -475,7 +472,9 @@ public class Driver implements IDriver {
                 && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
           setLastReplIdForDump(queryState.getConf());
         }
-        openTransaction();
+        queryTxnType = AcidUtils.getTxnType(tree);
+        openTransaction(queryTxnType);
+
         generateValidTxnList();
       }
 
@@ -676,7 +675,7 @@ public class Driver implements IDriver {
               lckCmp.getType() == LockType.SHARED_WRITE) &&
               lckCmp.getTablename() != null) {
             nonSharedLocks.add(
-                Warehouse.getQualifiedName(
+                TableName.getDbTable(
                     lckCmp.getDbname(), lckCmp.getTablename()));
           }
         }
@@ -687,7 +686,7 @@ public class Driver implements IDriver {
             lock.getHiveLockMode() == HiveLockMode.SEMI_SHARED) &&
             lock.getHiveLockObject().getPaths().length == 2) {
           nonSharedLocks.add(
-              Warehouse.getQualifiedName(
+              TableName.getDbTable(
                   lock.getHiveLockObject().getPaths()[0], lock.getHiveLockObject().getPaths()[1]));
         }
       }
@@ -699,25 +698,27 @@ public class Driver implements IDriver {
       return true;
     }
     ValidTxnWriteIdList txnWriteIdList = new ValidTxnWriteIdList(txnWriteIdListStr);
-    List<Pair<String, Table>> writtenTables = getWrittenTableList(plan);
+    Map<String, Table> writtenTables = getWrittenTables(plan);
+
     ValidTxnWriteIdList currentTxnWriteIds =
         queryTxnMgr.getValidWriteIds(
-            writtenTables.stream()
-                .filter(e -> AcidUtils.isTransactionalTable(e.getRight()))
-                .map(e -> e.getLeft())
+            writtenTables.entrySet().stream()
+                .filter(e -> AcidUtils.isTransactionalTable(e.getValue()))
+                .map(Map.Entry::getKey)
                 .collect(Collectors.toList()),
             currentTxnString);
-    for (Pair<String, Table> tableInfo : writtenTables) {
-      String fullQNameForLock = Warehouse.getQualifiedName(
-          tableInfo.getRight().getDbName(),
-          MetaStoreUtils.encodeTableName(tableInfo.getRight().getTableName()));
+
+    for (Map.Entry<String, Table> tableInfo : writtenTables.entrySet()) {
+      String fullQNameForLock = TableName.getDbTable(
+          tableInfo.getValue().getDbName(),
+          MetaStoreUtils.encodeTableName(tableInfo.getValue().getTableName()));
       if (nonSharedLocks.contains(fullQNameForLock)) {
         // Check if table is transactional
-        if (AcidUtils.isTransactionalTable(tableInfo.getRight())) {
+        if (AcidUtils.isTransactionalTable(tableInfo.getValue())) {
           // Check that write id is still valid
           if (!TxnIdUtils.checkEquivalentWriteIds(
-              txnWriteIdList.getTableValidWriteIdList(tableInfo.getLeft()),
-              currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getLeft()))) {
+              txnWriteIdList.getTableValidWriteIdList(tableInfo.getKey()),
+              currentTxnWriteIds.getTableValidWriteIdList(tableInfo.getKey()))) {
             // Write id has changed, it is not valid anymore,
             // we need to recompile
             return false;
@@ -729,7 +730,7 @@ public class Driver implements IDriver {
     if (!nonSharedLocks.isEmpty()) {
       throw new LockException("Wrong state: non-shared locks contain information for tables that have not" +
           " been visited when trying to validate the locks from query tables.\n" +
-          "Tables: " + writtenTables.stream().map(e -> e.getLeft()).collect(Collectors.toList()) + "\n" +
+          "Tables: " + writtenTables.keySet() + "\n" +
           "Remaining locks after check: " + nonSharedLocks);
     }
     // It passes the test, it is valid
@@ -766,10 +767,10 @@ public class Driver implements IDriver {
     LOG.debug("Setting " + ReplUtils.LAST_REPL_ID_KEY + " = " + lastReplId);
   }
 
-  private void openTransaction() throws LockException, CommandProcessorException {
+  private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException {
     if (checkConcurrency() && startImplicitTxn(queryTxnMgr) && !queryTxnMgr.isTxnOpen()) {
       String userFromUGI = getUserFromUGI();
-      queryTxnMgr.openTxn(ctx, userFromUGI);
+      queryTxnMgr.openTxn(ctx, userFromUGI, txnType);
     }
   }
 
@@ -933,7 +934,7 @@ public class Driver implements IDriver {
       throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " +
               JavaUtils.txnIdToString(txnMgr.getCurrentTxnId()));
     }
-    List<String> txnTables = getTransactionalTableList(plan);
+    List<String> txnTables = getTransactionalTables(plan);
     ValidTxnWriteIdList txnWriteIds = null;
     if (compactionWriteIds != null) {
       /**
@@ -946,12 +947,17 @@ public class Driver implements IDriver {
       if (txnTables.size() != 1) {
         throw new LockException("Unexpected tables in compaction: " + txnTables);
       }
-      String fullTableName = txnTables.get(0);
       txnWriteIds = new ValidTxnWriteIdList(compactorTxnId);
       txnWriteIds.addTableValidWriteIdList(compactionWriteIds);
     } else {
       txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString);
     }
+    if (queryTxnType == TxnType.READ_ONLY && !getWrittenTables(plan).isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Inferred transaction type '%s' doesn't conform to the actual query string '%s'",
+          queryTxnType, queryState.getQueryString()));
+    }
+
     String writeIdStr = txnWriteIds.toString();
     conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr);
     if (plan.getFetchTask() != null) {
@@ -980,20 +986,31 @@ public class Driver implements IDriver {
     return txnWriteIds;
   }
 
-  // Make the list of transactional tables list which are getting read or written by current txn
-  private List<String> getTransactionalTableList(QueryPlan plan) {
-    Set<String> tableList = new HashSet<>();
+  // Make the list of transactional tables that are read or written by current txn
+  private List<String> getTransactionalTables(QueryPlan plan) {
+    Map<String, Table> tables = new HashMap<>();
+    plan.getInputs().forEach(
+        input -> addTableFromEntity(input, tables)
+    );
+    plan.getOutputs().forEach(
+        output -> addTableFromEntity(output, tables)
+    );
+    return tables.entrySet().stream()
+      .filter(entry -> AcidUtils.isTransactionalTable(entry.getValue()))
+      .map(Map.Entry::getKey)
+      .collect(Collectors.toList());
+  }
 
-    for (ReadEntity input : plan.getInputs()) {
-      addTableFromEntity(input, tableList);
-    }
-    for (WriteEntity output : plan.getOutputs()) {
-      addTableFromEntity(output, tableList);
-    }
-    return new ArrayList<String>(tableList);
+  // Make the map of tables written by current txn
+  private Map<String, Table> getWrittenTables(QueryPlan plan) {
+    Map<String, Table> tables = new HashMap<>();
+    plan.getOutputs().forEach(
+        output -> addTableFromEntity(output, tables)
+    );
+    return tables;
   }
 
-  private void addTableFromEntity(Entity entity, Collection<String> tableList) {
+  private void addTableFromEntity(Entity entity, Map<String, Table> tables) {
     Table tbl;
     switch (entity.getType()) {
       case TABLE: {
@@ -1009,39 +1026,8 @@ public class Driver implements IDriver {
         return;
       }
     }
-    if (!AcidUtils.isTransactionalTable(tbl)) {
-      return;
-    }
     String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
-    tableList.add(fullTableName);
-  }
-
-  // Make the list of transactional tables list which are getting written by current txn
-  private List<Pair<String, Table>> getWrittenTableList(QueryPlan plan) {
-    List<Pair<String, Table>> result = new ArrayList<>();
-    Set<String> tableList = new HashSet<>();
-    for (WriteEntity output : plan.getOutputs()) {
-      Table tbl;
-      switch (output.getType()) {
-        case TABLE: {
-          tbl = output.getTable();
-          break;
-        }
-        case PARTITION:
-        case DUMMYPARTITION: {
-          tbl = output.getPartition().getTable();
-          break;
-        }
-        default: {
-          continue;
-        }
-      }
-      String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName());
-      if (tableList.add(fullTableName)) {
-        result.add(new ImmutablePair(fullTableName, tbl));
-      }
-    }
-    return result;
+    tables.put(fullTableName, tbl);
   }
 
   private String getUserFromUGI() throws CommandProcessorException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index fcf499d..67996c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
-import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX;
+import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.LockComponent;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.ddl.table.creation.CreateTableDesc;
@@ -72,6 +73,8 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -90,6 +93,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import javax.annotation.concurrent.Immutable;
 import java.nio.charset.Charset;
+import java.util.stream.Stream;
 
 /**
  * Utilities that are shared by all of the ACID input and output formats. They
@@ -2983,4 +2987,20 @@ public class AcidUtils {
       throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex);
     }
   }
+
+  /**
+   * Determines transaction type based on query AST.
+   * @param tree AST
+   */
+  public static TxnType getTxnType(ASTNode tree) {
+    final ASTSearcher astSearcher = new ASTSearcher();
+
+    return (tree.getToken().getType() == HiveParser.TOK_QUERY &&
+      Stream.of(
+        new int[]{HiveParser.TOK_INSERT_INTO},
+        new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB})
+        .noneMatch(pattern ->
+            astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ?
+      TxnType.READ_ONLY : TxnType.DEFAULT;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 943aa38..76934bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -220,11 +221,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   @Override
   public long openTxn(Context ctx, String user) throws LockException {
-    return openTxn(ctx, user, 0);
+    return openTxn(ctx, user, TxnType.DEFAULT, 0);
+  }
+
+  @Override
+  public long openTxn(Context ctx, String user, TxnType txnType) throws LockException {
+    return openTxn(ctx, user, txnType, 0);
   }
 
   @VisibleForTesting
-  long openTxn(Context ctx, String user, long delay) throws LockException {
+  long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException {
     /*Q: why don't we lock the snapshot here???  Instead of having client make an explicit call
     whenever it chooses
     A: If we want to rely on locks for transaction scheduling we must get the snapshot after lock
@@ -236,7 +242,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(txnId));
     }
     try {
-      txnId = getMS().openTxn(user);
+      txnId = getMS().openTxn(user, txnType);
       stmtId = 0;
       numStatements = 0;
       tableWriteIds.clear();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index ac813c8..7820013 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -54,6 +55,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   private HiveLockManagerCtx lockManagerCtx;
 
   @Override
+  public long openTxn(Context ctx, String user, TxnType txnType) throws LockException {
+    // No-op
+    return 0L;
+  }
+
+  @Override
   public long openTxn(Context ctx, String user) throws LockException {
     // No-op
     return 0L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 1c53426..600289f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverState;
 import org.apache.hadoop.hive.ql.ddl.database.lock.LockDatabaseDesc;
@@ -50,6 +51,16 @@ public interface HiveTxnManager {
    */
   long openTxn(Context ctx, String user) throws LockException;
 
+ /**
+  * Open a new transaction.
+  * @param ctx Context for this query
+  * @param user Hive user who is opening this transaction.
+  * @param txnType transaction type.
+  * @return The new transaction id
+  * @throws LockException if a transaction is already open.
+  */
+  long openTxn(Context ctx, String user, TxnType txnType) throws LockException;
+
   /**
    * Open a new transaction in target cluster.
    * @param replPolicy Replication policy to uniquely identify the source cluster.
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index cc86afe..cd91948 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -210,7 +211,8 @@ public class TestDbTxnManager {
   public void testExceptions() throws Exception {
     addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
     QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
-    ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+    ((DbTxnManager) txnMgr).openTxn(ctx, "NicholasII", TxnType.DEFAULT,
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
     LockException exception = null;
@@ -224,7 +226,8 @@ public class TestDbTxnManager {
     Assert.assertEquals("Wrong Exception1", ErrorMsg.TXN_ABORTED, exception.getCanonicalErrorMsg());
 
     exception = null;
-    ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+    ((DbTxnManager) txnMgr).openTxn(ctx, "AlexanderIII", TxnType.DEFAULT,
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();//this will abort the txn
     TxnStore txnHandler = TxnUtils.getTxnStore(conf);
@@ -441,7 +444,7 @@ public class TestDbTxnManager {
     // Case 2: If there's delay for the heartbeat, but the delay is within the reaper's tolerance,
     //         then txt should be able to commit
     // Start the heartbeat after a delay, which is shorter than  the HIVE_TXN_TIMEOUT
-    ((DbTxnManager) txnMgr).openTxn(ctx, "tom",
+    ((DbTxnManager) txnMgr).openTxn(ctx, "tom", TxnType.DEFAULT,
         HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2);
     txnMgr.acquireLocks(qp, ctx, "tom");
     runReaper();
@@ -457,7 +460,8 @@ public class TestDbTxnManager {
     //         then the txn will time out and be aborted.
     //         Here we just don't send the heartbeat at all - an infinite delay.
     // Start the heartbeat after a delay, which exceeds the HIVE_TXN_TIMEOUT
-    ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
+    ((DbTxnManager) txnMgr).openTxn(ctx, "jerry", TxnType.DEFAULT,
+        HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) * 2);
     txnMgr.acquireLocks(qp, ctx, "jerry");
     Thread.sleep(HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS));
     runReaper();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
new file mode 100644
index 0000000..bf8a028
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Transaction type derived from the original query test.
+ */
+@RunWith(value = Parameterized.class)
+public class TestParseUtils {
+
+  private String query;
+  private TxnType txnType;
+
+  public TestParseUtils(String query, TxnType txnType) {
+    this.query = query;
+    this.txnType = txnType;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+      new Object[][]{
+          {"SELECT current_timestamp()", TxnType.READ_ONLY},
+          {"SELECT count(*) FROM a", TxnType.READ_ONLY},
+          {"SELECT count(*) FROM a JOIN b ON a.id = b.id", TxnType.READ_ONLY},
+
+          {"WITH a AS (SELECT current_timestamp()) " +
+             "  SELECT * FROM a", TxnType.READ_ONLY},
+
+          {"INSERT INTO a VALUES (1, 2)", TxnType.DEFAULT},
+          {"INSERT INTO a SELECT * FROM b", TxnType.DEFAULT},
+          {"INSERT OVERWRITE TABLE a SELECT * FROM b", TxnType.DEFAULT},
+
+          {"FROM b INSERT OVERWRITE TABLE a SELECT *", TxnType.DEFAULT},
+
+          {"WITH a AS (SELECT current_timestamp()) " +
+             "  INSERT INTO b SELECT * FROM a", TxnType.DEFAULT},
+
+          {"UPDATE a SET col_b = 1", TxnType.DEFAULT},
+          {"DELETE FROM a WHERE col_b = 1", TxnType.DEFAULT},
+
+          {"CREATE TABLE a (col_b int)", TxnType.DEFAULT},
+          {"CREATE TABLE a AS SELECT * FROM b", TxnType.DEFAULT},
+          {"DROP TABLE a", TxnType.DEFAULT},
+
+          {"LOAD DATA LOCAL INPATH './examples/files/kv.txt' " +
+             "  OVERWRITE INTO TABLE a", TxnType.DEFAULT},
+
+          {"REPL LOAD a from './examples/files/kv.txt'", TxnType.DEFAULT},
+          {"REPL DUMP a", TxnType.DEFAULT},
+          {"REPL STATUS a", TxnType.DEFAULT},
+
+          {"MERGE INTO a trg using b src " +
+             "  ON src.col_a = trg.col_a " +
+             "WHEN MATCHED THEN " +
+             "  UPDATE SET col_b = src.col_b " +
+             "WHEN NOT MATCHED THEN " +
+             "  INSERT VALUES (src.col_a, src.col_b)",
+           TxnType.DEFAULT},
+      });
+  }
+
+  @Test
+  public void testTxnType() throws ParseException {
+    Assert.assertEquals(
+        AcidUtils.getTxnType(ParseUtils.parse(query)), txnType);
+  }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 355c4f5..6281208 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -57,6 +57,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -1700,10 +1701,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           Collections.sort(txnIds); //easier to read logs and for assumption done in replication flow
         }
 
-        // Check if all the input txns are in open state. Write ID should be allocated only for open transactions.
-        if (!isTxnsInOpenState(txnIds, stmt)) {
-          ensureAllTxnsValid(dbName, tblName, txnIds, stmt);
-          throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+        // Check if all the input txns are in valid state.
+        // Write IDs should be allocated only for open and not read-only transactions.
+        if (!isTxnsOpenAndNotReadOnly(txnIds, stmt)) {
+          String errorMsg = "Write ID allocation on " + TableName.getDbTable(dbName, tblName)
+              + " failed for input txns: "
+              + getAbortedAndReadOnlyTxns(txnIds, stmt)
+              + getCommittedTxns(txnIds, stmt);
+          LOG.error(errorMsg);
+
+          throw new IllegalStateException("Write ID allocation failed on " + TableName.getDbTable(dbName, tblName)
+              + " as not all input txns in open state or read-only");
         }
 
         List<String> queries = new ArrayList<>();
@@ -1716,7 +1724,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // first write on a table will allocate write id and rest of the writes should re-use it.
         prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
                         + " t2w_database = ? and t2w_table = ?" + " and ");
-        suffix.append("");
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
                 txnIds, "t2w_txnid", false, false);
 
@@ -4669,112 +4676,94 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   /**
-   * Checks if all the txns in the list are in open state.
-   * @param txnIds list of txns to be evaluated for open state
+   * Checks if all the txns in the list are in open state and not read-only.
+   * @param txnIds list of txns to be evaluated for open state/read-only status
    * @param stmt db statement
-   * @return If all txns in open state, then return true else false
+   * @return If all the txns in open state and not read-only, then return true else false
    */
-  private boolean isTxnsInOpenState(List<Long> txnIds, Statement stmt) throws SQLException {
+  private boolean isTxnsOpenAndNotReadOnly(List<Long> txnIds, Statement stmt) throws SQLException {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
 
-    // Get the count of txns from the given list are in open state. If the returned count is same as
-    // the input number of txns, then it means, all are in open state.
-    prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN + "' and ");
-    suffix.append("");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            txnIds, "txn_id", false, false);
+    // Get the count of txns from the given list that are in open state and not read-only.
+    // If the returned count is same as the input number of txns, then all txns are in open state and not read-only.
+    prefix.append("select count(*) from TXNS where txn_state = '" + TXN_OPEN
+        + "' and txn_type != " + TxnType.READ_ONLY.getValue() + " and ");
+
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+        txnIds, "txn_id", false, false);
 
     long count = 0;
     for (String query : queries) {
       LOG.debug("Going to execute query <" + query + ">");
-      ResultSet rs = stmt.executeQuery(query);
-      if (rs.next()) {
-        count += rs.getLong(1);
+      try (ResultSet rs = stmt.executeQuery(query)) {
+        if (rs.next()) {
+          count += rs.getLong(1);
+        }
       }
     }
     return count == txnIds.size();
   }
 
   /**
-   * Checks if all the txns in the list are in open state.
-   * @param dbName Database name
-   * @param tblName Table on which we try to allocate write id
-   * @param txnIds list of txns to be evaluated for open state
+   * Get txns from the list that are either aborted or read-only.
+   * @param txnIds list of txns to be evaluated for aborted state/read-only status
    * @param stmt db statement
    */
-  private void ensureAllTxnsValid(String dbName, String tblName, List<Long> txnIds, Statement stmt)
-          throws SQLException {
+  private String getAbortedAndReadOnlyTxns(List<Long> txnIds, Statement stmt) throws SQLException {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
-
-    // Check if any of the txns in the list is aborted.
-    prefix.append("select txn_id, txn_state from TXNS where ");
-    suffix.append("");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            txnIds, "txn_id", false, false);
-    Long txnId;
-    char txnState;
-    boolean isAborted = false;
-    StringBuilder errorMsg = new StringBuilder();
-    errorMsg.append("Write ID allocation on ")
-            .append(Warehouse.getQualifiedName(dbName, tblName))
-            .append(" failed for input txns: ");
+
+    // Check if any of the txns in the list are either aborted or read-only.
+    prefix.append("select txn_id, txn_state, txn_type from TXNS where ");
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+        txnIds, "txn_id", false, false);
+    StringBuilder txnInfo = new StringBuilder();
+
     for (String query : queries) {
       LOG.debug("Going to execute query <" + query + ">");
-      ResultSet rs = stmt.executeQuery(query);
-      while (rs.next()) {
-        txnId = rs.getLong(1);
-        txnState = rs.getString(2).charAt(0);
-        if (txnState != TXN_OPEN) {
-          isAborted = true;
-          errorMsg.append("{").append(txnId).append(",").append(txnState).append("}");
+      try (ResultSet rs = stmt.executeQuery(query)) {
+        while (rs.next()) {
+          long txnId = rs.getLong(1);
+          char txnState = rs.getString(2).charAt(0);
+          TxnType txnType = TxnType.findByValue(rs.getInt(3));
+
+          if (txnState != TXN_OPEN) {
+            txnInfo.append("{").append(txnId).append(",").append(txnState).append("}");
+          } else if (txnType == TxnType.READ_ONLY) {
+            txnInfo.append("{").append(txnId).append(",read-only}");
+          }
         }
       }
     }
-    // Check if any of the txns in the list is committed.
-    boolean isCommitted = checkIfTxnsCommitted(txnIds, stmt, errorMsg);
-    if (isAborted || isCommitted) {
-      LOG.error(errorMsg.toString());
-      throw new IllegalStateException("Write ID allocation failed on "
-              + Warehouse.getQualifiedName(dbName, tblName)
-              + " as not all input txns in open state");
-    }
+    return txnInfo.toString();
   }
 
   /**
-   * Checks if all the txns in the list are in committed. If yes, throw eception.
-   * @param txnIds list of txns to be evaluated for committed
+   * Get txns from the list that are committed.
+   * @param txnIds list of txns to be evaluated for committed state
    * @param stmt db statement
-   * @return true if any input txn is committed, else false
    */
-  private boolean checkIfTxnsCommitted(List<Long> txnIds, Statement stmt, StringBuilder errorMsg)
-          throws SQLException {
+  private String getCommittedTxns(List<Long> txnIds, Statement stmt) throws SQLException {
     List<String> queries = new ArrayList<>();
     StringBuilder prefix = new StringBuilder();
-    StringBuilder suffix = new StringBuilder();
 
-    // Check if any of the txns in the list is committed. If yes, throw exception.
+    // Check if any of the txns in the list are committed.
     prefix.append("select ctc_txnid from COMPLETED_TXN_COMPONENTS where ");
-    suffix.append("");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-            txnIds, "ctc_txnid", false, false);
-    Long txnId;
-    boolean isCommitted = false;
+    TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+        txnIds, "ctc_txnid", false, false);
+    StringBuilder txnInfo = new StringBuilder();
+
     for (String query : queries) {
       LOG.debug("Going to execute query <" + query + ">");
-      ResultSet rs = stmt.executeQuery(query);
-      while (rs.next()) {
-        isCommitted = true;
-        txnId = rs.getLong(1);
-        if (errorMsg != null) {
-          errorMsg.append("{").append(txnId).append(",c}");
+      try (ResultSet rs = stmt.executeQuery(query)) {
+        while (rs.next()) {
+          long txnId = rs.getLong(1);
+          txnInfo.append("{").append(txnId).append(",c}");
         }
       }
     }
-    return isCommitted;
+    return txnInfo.toString();
   }
 
   /**
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index b5f2209..fc08dbc 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -23,10 +23,8 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
 import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.client.builder.DatabaseBuilder;
@@ -39,12 +37,13 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Rule;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
-import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
 
@@ -67,6 +66,9 @@ public class TestHiveMetaStoreTxns {
   private IMetaStoreClient client;
   private Connection conn;
 
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
   @Test
   public void testTxns() throws Exception {
     List<Long> tids = client.openTxns("me", 3).getTxn_ids();
@@ -312,11 +314,20 @@ public class TestHiveMetaStoreTxns {
   public void testTxnTypePersisted() throws Exception {
     long txnId = client.openTxn("me", TxnType.READ_ONLY);
     Statement stm = conn.createStatement();
-    ResultSet rs = stm.executeQuery("SELECT txn_type FROM TXNS WHERE txn_id = " + txnId);
+    ResultSet rs = stm.executeQuery("SELECT txn_type FROM txns WHERE txn_id = " + txnId);
     Assert.assertTrue(rs.next());
     Assert.assertEquals(TxnType.findByValue(rs.getInt(1)), TxnType.READ_ONLY);
   }
 
+  @Test
+  public void testAllocateTableWriteIdForReadOnlyTxn() throws Exception {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Write ID allocation failed on db.tbl as not all input txns in open state or read-only");
+
+    long txnId = client.openTxn("me", TxnType.READ_ONLY);
+    client.allocateTableWriteId(txnId, "db", "tbl");
+  }
+
   @Before
   public void setUp() throws Exception {
     conf.setBoolean(ConfVars.HIVE_IN_TEST.getVarname(), true);