You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/16 23:23:33 UTC

[impala] branch master updated (df2c6f2 -> 8c5ea90)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from df2c6f2  IMPALA-8841: Try to fix Tez related dataload flakiness
     new 3948eda  IMPALA-8124: Modify TestWebPage::test_catalog to avoid flakiness.
     new 8c5ea90  IMPALA-8836: Support COMPUTE STATS on insert only ACID tables

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/impala/compat/MetastoreShim.java    |  43 ++++-
 .../org/apache/impala/compat/MetastoreShim.java    | 145 +++++++++++---
 .../apache/impala/analysis/ComputeStatsStmt.java   |   2 +-
 .../org/apache/impala/analysis/DropStatsStmt.java  |   2 +
 .../apache/impala/service/CatalogOpExecutor.java   | 208 +++++++++++++--------
 .../java/org/apache/impala/service/Frontend.java   |   2 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   7 +-
 .../queries/QueryTest/acid-compute-stats.test      | 105 +++++++++++
 .../queries/QueryTest/acid-negative.test           |   2 +-
 tests/query_test/test_acid.py                      |   9 +
 tests/webserver/test_web_pages.py                  |  55 ++++--
 11 files changed, 447 insertions(+), 133 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/acid-compute-stats.test


[impala] 01/02: IMPALA-8124: Modify TestWebPage::test_catalog to avoid flakiness.

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3948eda50bcae20c1080475571f95a1080b0820d
Author: Anurag Mantripragada <an...@gmail.com>
AuthorDate: Thu Aug 8 17:11:59 2019 -0700

    IMPALA-8124: Modify TestWebPage::test_catalog to avoid flakiness.
    
    This test scrapes the /catalog webpage for metrics. This can
    occasionally run into a race condition when a table lock is
    being held for functional*.* tables and the test tries to get
    the metrics. This result in a failed assert for the metrics.
    This change rewrites the test to create new tables with
    unique_database fixture to avoid flakiness.
    
    Also, seperated out an assert with AND into two different asserts
    so it's easy to track which one failed.
    
    Testing:
    Ran multiple runs of this test locally while refreshing the table.
    
    Change-Id: I341bf25baf8d9316a21a9eff860de84b33afd12f
    Reviewed-on: http://gerrit.cloudera.org:8080/14075
    Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/webserver/test_web_pages.py | 55 +++++++++++++++++++++++++++++----------
 1 file changed, 41 insertions(+), 14 deletions(-)

diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index e5bee26..9df9cb3 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -157,9 +157,10 @@ class TestWebPage(ImpalaTestSuite):
     for port in ports_to_test:
       input_url = url.format(port)
       response = requests.get(input_url)
-      assert (response.status_code == requests.codes.ok
-          and string_to_search in response.text), "URL: {0} Str:'{1}'\nResp:{2}".format(
-              input_url, string_to_search, response.text)
+      assert response.status_code == requests.codes.ok, "URL: {0} Str:'{1}'\nResp:{2}"\
+        .format(input_url, string_to_search, response.text)
+      assert string_to_search in response.text, "URL: {0} Str:'{1}'\nResp:{2}".format(
+        input_url, string_to_search, response.text)
       responses.append(response)
     return responses
 
@@ -270,19 +271,45 @@ class TestWebPage(ImpalaTestSuite):
     # Reset log level.
     self.get_and_check_status(self.RESET_GLOG_LOGLEVEL_URL, "v set to ")
 
-  def test_catalog(self, cluster_properties):
+  def test_catalog(self, cluster_properties, unique_database):
     """Tests the /catalog and /catalog_object endpoints."""
-    self.get_and_check_status_jvm(self.CATALOG_URL, "functional")
-    self.get_and_check_status_jvm(self.CATALOG_URL, "alltypes")
+    # Non-partitioned table
+    query = "create table {0}.foo (id int, val int)".format(unique_database)
+    self.execute_query(query)
+    insert_query = "insert into {0}.foo values (1, 200)".format(unique_database)
+    self.execute_query(insert_query)
+    # Partitioned table
+    partitioned_query = "create table {0}.foo_part (id int, val int) partitioned by (" \
+      "year int)".format(unique_database)
+    self.execute_query(partitioned_query)
+    partition_insert_query = "insert into {0}.foo_part partition (year=2010) values "\
+      "(1, 200)".format(unique_database)
+    self.execute_query(partition_insert_query)
+    # Kudu table
+    kudu_query = "create table {0}.foo_kudu (id int, val int, primary key (id))  " \
+      "stored as kudu".format(unique_database)
+    self.execute_query(kudu_query)
+    kudu_insert_query = "insert into {0}.foo_kudu values (1, 200)".format(unique_database)
+    self.execute_query(kudu_insert_query)
+    # Partitioned parquet table
+    parquet_query = "create table {0}.foo_part_parquet (id int, val int) partitioned " \
+      "by (year int) stored as parquet".format(unique_database)
+    self.execute_query(parquet_query)
+    parquet_insert_query = "insert into {0}.foo_part_parquet partition (year=2010) " \
+      "values (1, 200)".format(unique_database)
+    self.execute_query(parquet_insert_query)
+
+    self.get_and_check_status_jvm(self.CATALOG_URL, unique_database)
+    self.get_and_check_status_jvm(self.CATALOG_URL, "foo_part")
     # IMPALA-5028: Test toThrift() of a partitioned table via the WebUI code path.
-    self.__test_catalog_object("functional", "alltypes", cluster_properties)
-    self.__test_catalog_object("functional_parquet", "alltypes", cluster_properties)
-    self.__test_catalog_object("functional", "alltypesnopart", cluster_properties)
-    self.__test_catalog_object("functional_kudu", "alltypes", cluster_properties)
-    self.__test_table_metrics("functional", "alltypes", "total-file-size-bytes")
-    self.__test_table_metrics("functional", "alltypes", "num-files")
-    self.__test_table_metrics("functional_kudu", "alltypes", "alter-duration")
-    self.__test_catalog_tablesfilesusage("functional", "alltypes", "24")
+    self.__test_catalog_object(unique_database, "foo_part", cluster_properties)
+    self.__test_catalog_object(unique_database, "foo_kudu", cluster_properties)
+    self.__test_catalog_object(unique_database, "foo_part_parquet", cluster_properties)
+    self.__test_catalog_object(unique_database, "foo", cluster_properties)
+    self.__test_table_metrics(unique_database, "foo_part", "total-file-size-bytes")
+    self.__test_table_metrics(unique_database, "foo_part", "num-files")
+    self.__test_table_metrics(unique_database, "foo_part", "alter-duration")
+    self.__test_catalog_tablesfilesusage(unique_database, "foo_part", "1")
 
   def __test_catalog_object(self, db_name, tbl_name, cluster_properties):
     """Tests the /catalog_object endpoint for the given db/table. Runs


[impala] 02/02: IMPALA-8836: Support COMPUTE STATS on insert only ACID tables

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8c5ea90aa53dd925ec038ef9d8ea71e7919e3127
Author: Csaba Ringhofer <cs...@cloudera.com>
AuthorDate: Wed Aug 14 19:28:37 2019 +0200

    IMPALA-8836: Support COMPUTE STATS on insert only ACID tables
    
    For ACID tables COMPUTE STATS needs to use a new HMS API, as the
    old one is rejected by metastore. This API currently has some
    counter intuitive parts:
    - setPartitionColumnStatistics is used to set table stats, as there
      is no similar function exposed by HMS client for tables at the
      moment.
    - A new writeId is allocated for the stat change, and this needs
      a transaction, so a transaction is opened/committed/aborted even
      though this doesn't seem necessary. The Hive code seems to use
      internal API for this.
    - Even though the HMS thrift Table object has a colStats field,
      it is only applied during alter_table if there are other changes
      like new columns in the tables, so alter_table couldn't be used
      to change column stats.
    
    Additional changes:
    - DROP STATS is no longer allowed for transactional tables, as it
      turned out that there is no transactional version of the old API.
    - Remove COLUMN_STATS_ACCURATE table property during COMPUTE STATS
      to ensure that Hive does use stats computed by Impala to return
      answer queries like SELECT count(*)
    - Changed CatalogOpExecutor.updateCatalog() to get the writeIds
      earlier. This can mean unnecassary HMS RPC calls if no property
      change is needed in the end, but I felt it hard to reason about
      what happens if these RPC calls fail at their original location.
    
    TODOs (My plan is to do these in IMPALA-8865):
    - Tried to make the MetastoreShim API easier to use by adding a class
      to encapsulate thing like txnId and writeId, but it feels rather
      half baked and under documented.
      A similar class is added in  https://gerrit.cloudera.org/#/c/14071/,
      it would be good to merge them.
    - The validWriteIdList of the original SELECT(s) behind COMPUTE
      STATS could be used in the HMS API calls, but this would need
      more plumbing.
    
    Change-Id: I5c06b4678c1ff75c5aa1586a78afea563e64057f
    Reviewed-on: http://gerrit.cloudera.org:8080/14066
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  43 ++++-
 .../org/apache/impala/compat/MetastoreShim.java    | 145 +++++++++++---
 .../apache/impala/analysis/ComputeStatsStmt.java   |   2 +-
 .../org/apache/impala/analysis/DropStatsStmt.java  |   2 +
 .../apache/impala/service/CatalogOpExecutor.java   | 208 +++++++++++++--------
 .../java/org/apache/impala/service/Frontend.java   |   2 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |   7 +-
 .../queries/QueryTest/acid-compute-stats.test      | 105 +++++++++++
 .../queries/QueryTest/acid-negative.test           |   2 +-
 tests/query_test/test_acid.py                      |   9 +
 10 files changed, 406 insertions(+), 119 deletions(-)

diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
index b556774..07d2aaf 100644
--- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java
@@ -58,6 +58,7 @@ import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.TransactionException;
 import org.apache.impala.service.Frontend;
@@ -71,6 +72,31 @@ import org.apache.thrift.TException;
  * between major versions of Hive. This implements the shimmed methods for Hive 2.
  */
 public class MetastoreShim {
+
+  /**
+   * Empty class, should not be instantiated.
+   */
+  public static class TblTransaction {
+    public TblTransaction() {
+      throw new UnsupportedOperationException("new TblTransaction");
+    }
+  }
+
+  public static TblTransaction createTblTransaction(
+     IMetaStoreClient client, Table tbl, long txnId) {
+    throw new UnsupportedOperationException("createTblTransaction");
+  }
+
+  public static void commitTblTransactionIfNeeded(IMetaStoreClient client,
+      TblTransaction tblTxn) throws TransactionException {
+    throw new UnsupportedOperationException("commitTblTransactionIfNeeded");
+  }
+
+  public static void abortTblTransactionIfNeeded(IMetaStoreClient client,
+      TblTransaction tblTxn) {
+    throw new UnsupportedOperationException("abortTblTransactionIfNeeded");
+  }
+
   /**
    * Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
    */
@@ -82,7 +108,7 @@ public class MetastoreShim {
    * Hive-3 only function
    */
   public static void alterTableWithTransaction(IMetaStoreClient client,
-      Table tbl, long txnId) {
+      Table tbl, TblTransaction tblTxn) {
     throw new UnsupportedOperationException("alterTableWithTransaction");
   }
 
@@ -111,8 +137,7 @@ public class MetastoreShim {
   * Hive-3 only function
   */
   public static void alterPartitionsWithTransaction(IMetaStoreClient client,
-      String dbName, String tblName, List<Partition> partitions,
-      long tblWriteId, long txnId) {
+      String dbName, String tblName, List<Partition> partitions, TblTransaction tblTxn) {
     throw new UnsupportedOperationException("alterTableWithTransaction");
   }
 
@@ -362,7 +387,7 @@ public class MetastoreShim {
   /**
    * Hive-3 only function
    */
-  public static long openTransaction(IMetaStoreClient client, String userId)
+  public static long openTransaction(IMetaStoreClient client)
       throws TransactionException {
     throw new UnsupportedOperationException("openTransaction is not supported.");
   }
@@ -417,6 +442,16 @@ public class MetastoreShim {
   }
 
   /**
+   * Hive-3 only function
+   */
+  public static void setTableColumnStatsTransactional(IMetaStoreClient client,
+      Table msTbl, ColumnStatistics colStats, TblTransaction tblTxn)
+      throws ImpalaRuntimeException {
+    throw new UnsupportedOperationException(
+        "setTableColumnStatsTransactional is not supported.");
+  }
+
+  /**
    * @return the shim version.
    */
   public static long getMajorVersion() {
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 44046b6..e96e7b3 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.EnumSet;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
@@ -79,6 +81,7 @@ import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
 import org.apache.hive.service.rpc.thrift.TGetTablesReq;
 import org.apache.impala.authorization.User;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.TransactionException;
 import org.apache.impala.common.TransactionKeepalive;
@@ -98,7 +101,8 @@ import com.google.common.collect.Lists;
  * between major versions of Hive. This implements the shimmed methods for Hive 3.
  */
 public class MetastoreShim {
-  public static final Logger LOG = Logger.getLogger(MetastoreShim.class);
+  private static final Logger LOG = Logger.getLogger(MetastoreShim.class);
+
   private static final String EXTWRITE = "EXTWRITE";
   private static final String EXTREAD = "EXTREAD";
   private static final String HIVEBUCKET2 = "HIVEBUCKET2";
@@ -120,6 +124,60 @@ public class MetastoreShim {
   // Time interval between retries of acquiring an HMS ACID lock
   private static final int LOCK_RETRY_WAIT_SECONDS = 3;
 
+  private final static String HMS_RPC_ERROR_FORMAT_STR =
+      "Error making '%s' RPC to Hive Metastore: ";
+
+  // Id used to register transactions / locks.
+  // Not final, as it makes sense to set it based on role + instance, see IMPALA-8853.
+  public static String TRANSACTION_USER_ID = "Impala";
+
+  /**
+   * Transaction parameters needed for single table operations.
+   */
+  public static class TblTransaction {
+    public long txnId;
+    public boolean ownsTxn;
+    public long writeId;
+    public String validWriteIds;
+  }
+
+  /**
+   * Initializes and returns a TblTransaction object for table 'tbl'.
+   * Opens a new transaction if txnId is not valid.
+   */
+  public static TblTransaction createTblTransaction(
+     IMetaStoreClient client, Table tbl, long txnId)
+     throws TransactionException {
+    TblTransaction tblTxn = new TblTransaction();
+    try {
+      if (txnId <= 0) {
+        txnId = openTransaction(client);
+        tblTxn.ownsTxn = true;
+      }
+      tblTxn.txnId = txnId;
+      tblTxn.writeId =
+          allocateTableWriteId(client, txnId, tbl.getDbName(), tbl.getTableName());
+      tblTxn.validWriteIds =
+          getValidWriteIdListInTxn(client, tbl.getDbName(), tbl.getTableName(), txnId);
+      return tblTxn;
+    }
+    catch (TException e) {
+      if (tblTxn.ownsTxn) abortTransactionNoThrow(client, tblTxn.txnId);
+      throw new TransactionException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "createTblTransaction"), e);
+    }
+  }
+
+  static public void commitTblTransactionIfNeeded(IMetaStoreClient client,
+      TblTransaction tblTxn) throws TransactionException {
+    if (tblTxn.ownsTxn) commitTransaction(client, tblTxn.txnId);
+  }
+
+  static public void abortTblTransactionIfNeeded(IMetaStoreClient client,
+      TblTransaction tblTxn) {
+    if (tblTxn.ownsTxn) abortTransactionNoThrow(client, tblTxn.txnId);
+  }
+
   /**
    * Constant variable that stores engine value needed to store / access
    * Impala column statistics.
@@ -137,14 +195,17 @@ public class MetastoreShim {
    * Wrapper around IMetaStoreClient.alter_table with validWriteIds as a param.
    */
   public static void alterTableWithTransaction(IMetaStoreClient client,
-     Table tbl, long txnId)
-     throws InvalidOperationException, MetaException, TException {
-    // Get ValidWriteIdList to pass Hive verify when set table property
-    // COLUMN_STATS_ACCURATE
-    String validWriteIds = getValidWriteIdListInTxn(client, tbl.getDbName(),
-        tbl.getTableName(), txnId);
-    client.alter_table(null, tbl.getDbName(), tbl.getTableName(),
-        tbl, null, validWriteIds);
+     Table tbl, TblTransaction tblTxn)
+     throws ImpalaRuntimeException {
+    tbl.setWriteId(tblTxn.writeId);
+    try {
+      client.alter_table(null, tbl.getDbName(), tbl.getTableName(),
+        tbl, null, tblTxn.validWriteIds);
+    }
+    catch (TException e) {
+      throw new ImpalaRuntimeException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
+    }
   }
 
 
@@ -172,14 +233,15 @@ public class MetastoreShim {
    * Wrapper around IMetaStoreClient.alter_partitions with transaction information
    */
   public static void alterPartitionsWithTransaction(IMetaStoreClient client,
-    String dbName, String tblName, List<Partition> partitions, long tblWriteId,
-    long txnId) throws InvalidOperationException, MetaException, TException {
-    // Get ValidWriteIdList to pass Hive verify  when set
-    // property(COLUMN_STATS_ACCURATE). Correct validWriteIdList is also needed
-    // to commit the alter partitions operation in hms side.
-    String validWriteIds = getValidWriteIdListInTxn(client, dbName, tblName, txnId);
-    client.alter_partitions(dbName, tblName, partitions, null,
-         validWriteIds, tblWriteId);
+      String dbName, String tblName, List<Partition> partitions, TblTransaction tblTxn
+      ) throws InvalidOperationException, MetaException, TException {
+    for (Partition part : partitions) {
+      part.setWriteId(tblTxn.writeId);
+      // Correct validWriteIdList is needed
+      // to commit the alter partitions operation in hms side.
+      client.alter_partitions(dbName, tblName, partitions, null,
+           tblTxn.validWriteIds, tblTxn.writeId);
+    }
   }
 
   /**
@@ -528,7 +590,7 @@ public class MetastoreShim {
    */
   private static String getValidWriteIdListInTxn(IMetaStoreClient client, String dbName,
       String tblName, long txnId)
-      throws InvalidOperationException, MetaException, TException {
+      throws TException {
     ValidTxnList txns = client.getValidTxns(txnId);
     String tableFullName = dbName + "." + tblName;
     List<TableValidWriteIds> writeIdsObj = client.getValidWriteIds(
@@ -554,14 +616,6 @@ public class MetastoreShim {
   }
 
   /**
-   * Set write ID to HMS partition.
-   */
-  public static void setWriteIdForMSPartition(Partition partition, long writeId) {
-    Preconditions.checkNotNull(partition);
-    partition.setWriteId(writeId);
-  }
-
-  /**
    * Wrapper around HMS Table object to get writeID
    * Per table writeId is introduced in ACID 2
    * It is used to detect changes of the table
@@ -573,15 +627,16 @@ public class MetastoreShim {
 
   /**
    * Opens a new transaction.
+   * Sets userId to TRANSACTION_USER_ID.
    * @param client is the HMS client to be used.
    * @param userId of user who is opening this transaction.
    * @return the new transaction id.
    * @throws TransactionException
    */
-  public static long openTransaction(IMetaStoreClient client, String userId)
+  public static long openTransaction(IMetaStoreClient client)
       throws TransactionException {
     try {
-      return client.openTxn(userId);
+      return client.openTxn(TRANSACTION_USER_ID);
     } catch (Exception e) {
       throw new TransactionException(e.getMessage());
     }
@@ -662,7 +717,7 @@ public class MetastoreShim {
       List<LockComponent> lockComponents)
           throws TransactionException {
     LockRequestBuilder lockRequestBuilder = new LockRequestBuilder();
-    lockRequestBuilder.setUser("Impala");
+    lockRequestBuilder.setUser(TRANSACTION_USER_ID);
     if (txnId > 0) lockRequestBuilder.setTransactionId(txnId);
     for (LockComponent lockComponent : lockComponents) {
       lockRequestBuilder.addLockComponent(lockComponent);
@@ -715,6 +770,19 @@ public class MetastoreShim {
   }
 
   /**
+   * Aborts a transaction and logs the error if there is an exception.
+   * @param client is the HMS client to be used.
+   * @param txnId is the transaction id.
+   */
+  public static void abortTransactionNoThrow(IMetaStoreClient client, long txnId) {
+    try {
+      client.abortTxns(Arrays.asList(txnId));
+    } catch (Exception e) {
+      LOG.error("Error in abortTxns.", e);
+    }
+  }
+
+  /**
    * Allocates a write id for the given table.
    * @param client is the HMS client to be used.
    * @param txnId is the transaction id.
@@ -820,6 +888,25 @@ public class MetastoreShim {
     msTbl.setAccessType(accessType);
   }
 
+  public static void setTableColumnStatsTransactional(IMetaStoreClient client,
+      Table msTbl, ColumnStatistics colStats, TblTransaction tblTxn)
+      throws ImpalaRuntimeException {
+    List<ColumnStatistics> colStatsList = new ArrayList<>();
+    colStatsList.add(colStats);
+    SetPartitionsStatsRequest request = new SetPartitionsStatsRequest(colStatsList);
+    request.setWriteId(tblTxn.writeId);
+    request.setValidWriteIdList(tblTxn.validWriteIds);
+    try {
+      // Despite its name, the function below can and (and currently must) be used
+      // to set table level column statistics in transactional tables.
+      client.setPartitionColumnStatistics(request);
+    }
+    catch (TException e) {
+      throw new ImpalaRuntimeException(
+          String.format(HMS_RPC_ERROR_FORMAT_STR, "setPartitionColumnStatistics"), e);
+    }
+  }
+
   /**
    * @return the hive major version
    */
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 43b34b3..8900d91 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -362,7 +362,7 @@ public class ComputeStatsStmt extends StatementBase {
     table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT);
     // Adding the check here instead of tableRef.analyze because tableRef is
     // used at multiple places and will even disallow select.
-    analyzer.ensureTableNotTransactional(table_);
+    analyzer.ensureTableNotFullAcid(table_);
 
     if (!(table_ instanceof FeFsTable)) {
       if (partitionSet_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropStatsStmt.java
index f495deb..6ec1868 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropStatsStmt.java
@@ -102,6 +102,8 @@ public class DropStatsStmt extends StatementBase {
           String.format("DROP STATS not allowed on a nested collection: %s", tableName_));
     }
     tableRef_.analyze(analyzer);
+    // There is no transactional HMS API to drop stats at the moment (HIVE-22104).
+    analyzer.ensureTableNotTransactional(tableRef_.getTable());
     if (partitionSet_ != null) {
       partitionSet_.setTableName(tableRef_.getTable().getTableName());
       partitionSet_.setPrivilegeRequirement(Privilege.ALTER);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 5a19f0f..c6b86be 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -883,7 +883,7 @@ public class CatalogOpExecutor {
       if (LOG.isTraceEnabled()) {
         LOG.trace(String.format("Altering view %s", tableName));
       }
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
       try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
         tbl.load(true, msClient.getHiveClient(), msTbl, "ALTER VIEW");
       }
@@ -940,33 +940,69 @@ public class CatalogOpExecutor {
           tableName, params.isSetTable_stats(), numPartitions, numColumns));
     }
 
+    // Deep copy the msTbl to avoid updating our cache before successfully persisting
+    // the results to the metastore.
+    org.apache.hadoop.hive.metastore.api.Table msTbl =
+        table.getMetaStoreTable().deepCopy();
+
+    // TODO: Transaction committing / aborting seems weird for stat update, but I don't
+    //       see other ways to get a new write id (which is needed to update
+    //       transactional tables). Hive seems to use internal API for this.
+    //       See IMPALA-8865 about plans to improve this.
+    MetastoreShim.TblTransaction tblTxn = null;
+    try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+      try {
+        if (AcidUtils.isTransactionalTable(msTbl.getParameters())) {
+          tblTxn = MetastoreShim.createTblTransaction(
+              msClient.getHiveClient(), msTbl, -1 /* opens new transaction */);
+        }
+        alterTableUpdateStatsInner(table, msTbl, params,
+            numUpdatedPartitions, numUpdatedColumns, msClient, tblTxn);
+        if (tblTxn != null) {
+          MetastoreShim.commitTblTransactionIfNeeded(msClient.getHiveClient(), tblTxn);
+        }
+      } catch (Exception ex) {
+        if (tblTxn != null) {
+          MetastoreShim.abortTblTransactionIfNeeded(msClient.getHiveClient(), tblTxn);
+        }
+        throw ex;
+      }
+    }
+  }
+
+  private void alterTableUpdateStatsInner(Table table,
+      org.apache.hadoop.hive.metastore.api.Table msTbl,
+      TAlterTableUpdateStatsParams params,
+      Reference<Long> numUpdatedPartitions, Reference<Long> numUpdatedColumns,
+      MetaStoreClient msClient, MetastoreShim.TblTransaction tblTxn)
+      throws ImpalaException {
     // Update column stats.
     numUpdatedColumns.setRef(0L);
     if (params.isSetColumn_stats()) {
       ColumnStatistics colStats = createHiveColStats(params, table);
       if (colStats.getStatsObjSize() > 0) {
-        try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-          msClient.getHiveClient().updateTableColumnStatistics(colStats);
-        } catch (Exception e) {
-          throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
-              "updateTableColumnStatistics"), e);
+        if (tblTxn != null) {
+          MetastoreShim.setTableColumnStatsTransactional(
+              msClient.getHiveClient(), msTbl, colStats, tblTxn);
+        } else {
+          try {
+            msClient.getHiveClient().updateTableColumnStatistics(colStats);
+          } catch (Exception e) {
+            throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR,
+                "updateTableColumnStatistics"), e);
+          }
         }
       }
       numUpdatedColumns.setRef((long) colStats.getStatsObjSize());
     }
 
-    // Deep copy the msTbl to avoid updating our cache before successfully persisting
-    // the results to the metastore.
-    org.apache.hadoop.hive.metastore.api.Table msTbl =
-        table.getMetaStoreTable().deepCopy();
-
     // Update partition-level row counts and incremental column stats for
     // partitioned Hdfs tables.
     List<HdfsPartition> modifiedParts = null;
     if (params.isSetPartition_stats() && table.getNumClusteringCols() > 0) {
       Preconditions.checkState(table instanceof HdfsTable);
       modifiedParts = updatePartitionStats(params, (HdfsTable) table);
-      bulkAlterPartitions(table, modifiedParts);
+      bulkAlterPartitions(table, modifiedParts, tblTxn);
     }
 
     if (params.isSetTable_stats()) {
@@ -977,7 +1013,9 @@ public class CatalogOpExecutor {
       Table.updateTimestampProperty(msTbl, HdfsTable.TBL_PROP_LAST_COMPUTE_STATS_TIME);
     }
 
-    applyAlterTable(msTbl, false);
+    // Apply property changes like numRows.
+    msTbl.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
+    applyAlterTable(msTbl, false, tblTxn);
     numUpdatedPartitions.setRef(0L);
     if (modifiedParts != null) {
       numUpdatedPartitions.setRef((long) modifiedParts.size());
@@ -1037,6 +1075,7 @@ public class CatalogOpExecutor {
       partition.putToParameters(StatsSetupConst.ROW_COUNT, String.valueOf(numRows));
       // HMS requires this param for stats changes to take effect.
       partition.putToParameters(MetastoreShim.statsGeneratedViaStatsTaskParam());
+      partition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
       modifiedParts.add(partition);
     }
     return modifiedParts;
@@ -1309,6 +1348,10 @@ public class CatalogOpExecutor {
     Table table = getExistingTable(params.getTable_name().getDb_name(),
         params.getTable_name().getTable_name(), "Load for DROP STATS");
     Preconditions.checkNotNull(table);
+    // There is no transactional HMS API to drop stats at the moment (HIVE-22104).
+    Preconditions.checkState(!AcidUtils.isTransactionalTable(
+        table.getMetaStoreTable().getParameters()));
+
     if (!catalog_.tryLockTable(table)) {
       throw new InternalException(String.format("Error dropping stats for table %s " +
           "due to lock contention", table.getFullName()));
@@ -1395,8 +1438,9 @@ public class CatalogOpExecutor {
         msTbl.getParameters().remove(StatsSetupConst.ROW_COUNT) != null;
     boolean droppedTotalSize =
         msTbl.getParameters().remove(StatsSetupConst.TOTAL_SIZE) != null;
+
     if (droppedRowCount || droppedTotalSize) {
-      applyAlterTable(msTbl, false);
+      applyAlterTable(msTbl, false, null);
       ++numTargetedPartitions;
     }
 
@@ -1431,7 +1475,7 @@ public class CatalogOpExecutor {
       if (isModified) modifiedParts.add(part);
     }
 
-    bulkAlterPartitions(table, modifiedParts);
+    bulkAlterPartitions(table, modifiedParts, null);
     return modifiedParts.size();
   }
 
@@ -2071,7 +2115,7 @@ public class CatalogOpExecutor {
         catalog_.watchCacheDirs(Lists.<Long>newArrayList(id),
             new TTableName(newTable.getDbName(), newTable.getTableName()),
                 "CREATE TABLE CACHED");
-        applyAlterTable(newTable, true);
+        applyAlterTable(newTable);
       }
       Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName());
       addTableToCatalogUpdate(newTbl, response.result);
@@ -2311,7 +2355,7 @@ public class CatalogOpExecutor {
     if (!colsToAdd.isEmpty()) {
       // Append the new column to the existing list of columns.
       msTbl.getSd().getCols().addAll(buildFieldSchemaList(colsToAdd));
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
       return true;
     }
     return false;
@@ -2333,7 +2377,7 @@ public class CatalogOpExecutor {
           columns);
       msTbl.getParameters().put(sortByKey, alteredColumns);
     }
-    applyAlterTable(msTbl, true);
+    applyAlterTable(msTbl);
   }
 
   /**
@@ -2370,7 +2414,7 @@ public class CatalogOpExecutor {
             "Column name %s not found in table %s.", colName, tbl.getFullName()));
       }
     }
-    applyAlterTable(msTbl, true);
+    applyAlterTable(msTbl);
   }
 
   /**
@@ -2638,7 +2682,7 @@ public class CatalogOpExecutor {
       String alteredColumns = MetaStoreUtil.removeValueFromCsvList(oldColumns, colName);
       msTbl.getParameters().put(sortByKey, alteredColumns);
     }
-    applyAlterTable(msTbl, true);
+    applyAlterTable(msTbl);
   }
 
   /**
@@ -2741,7 +2785,7 @@ public class CatalogOpExecutor {
       // The prototype partition must be updated if the file format is changed so that new
       // partitions are created with the new file format.
       if (tbl instanceof HdfsTable) ((HdfsTable) tbl).setPrototypePartition(msTbl.getSd());
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
       reloadFileMetadata = true;
     } else {
       Preconditions.checkArgument(tbl instanceof HdfsTable);
@@ -2752,7 +2796,7 @@ public class CatalogOpExecutor {
         partition.setFileFormat(HdfsFileFormat.fromThrift(fileFormat));
         modifiedParts.add(partition);
       }
-      bulkAlterPartitions(tbl, modifiedParts);
+      bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
     }
     return reloadFileMetadata;
@@ -2779,7 +2823,7 @@ public class CatalogOpExecutor {
       // The prototype partition must be updated if the row format is changed so that new
       // partitions are created with the new file format.
       ((HdfsTable) tbl).setPrototypePartition(msTbl.getSd());
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
       reloadFileMetadata = true;
     } else {
       List<HdfsPartition> partitions =
@@ -2789,7 +2833,7 @@ public class CatalogOpExecutor {
         HiveStorageDescriptorFactory.setSerdeInfo(rowFormat, partition.getSerdeInfo());
         modifiedParts.add(partition);
       }
-      bulkAlterPartitions(tbl, modifiedParts);
+      bulkAlterPartitions(tbl, modifiedParts, null);
       numUpdatedPartitions.setRef((long) modifiedParts.size());
     }
     return reloadFileMetadata;
@@ -2820,7 +2864,7 @@ public class CatalogOpExecutor {
           tbl.getMetaStoreTable().deepCopy();
       if (msTbl.getPartitionKeysSize() == 0) reloadFileMetadata = true;
       msTbl.getSd().setLocation(location);
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
     } else {
       TableName tableName = tbl.getTableName();
       HdfsPartition partition = catalog_.getHdfsPartition(
@@ -2866,7 +2910,7 @@ public class CatalogOpExecutor {
         modifiedParts.add(partition);
       }
       try {
-        bulkAlterPartitions(tbl, modifiedParts);
+        bulkAlterPartitions(tbl, modifiedParts, null);
       } finally {
         for (HdfsPartition modifiedPart : modifiedParts) {
           modifiedPart.markDirty();
@@ -2905,7 +2949,7 @@ public class CatalogOpExecutor {
           throw new UnsupportedOperationException(
               "Unknown target TTablePropertyType: " + params.getTarget());
       }
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
     }
   }
 
@@ -3033,7 +3077,7 @@ public class CatalogOpExecutor {
     }
 
     // Update the table metadata.
-    applyAlterTable(msTbl, true);
+    applyAlterTable(msTbl);
     return loadFileMetadata;
   }
 
@@ -3094,7 +3138,7 @@ public class CatalogOpExecutor {
       }
     }
     try {
-      bulkAlterPartitions(tbl, modifiedParts);
+      bulkAlterPartitions(tbl, modifiedParts, null);
     } finally {
       for (HdfsPartition modifiedPart : modifiedParts) {
         modifiedPart.markDirty();
@@ -3176,7 +3220,7 @@ public class CatalogOpExecutor {
     PrincipalType oldOwnerType = msTbl.getOwnerType();
     msTbl.setOwner(params.owner_name);
     msTbl.setOwnerType(PrincipalType.valueOf(params.owner_type.name()));
-    applyAlterTable(msTbl, true);
+    applyAlterTable(msTbl);
     if (authzConfig_.isEnabled()) {
       authzManager_.updateTableOwnerPrivilege(params.server_name, msTbl.getDbName(),
           msTbl.getTableName(), oldOwner, oldOwnerType, msTbl.getOwner(),
@@ -3311,6 +3355,14 @@ public class CatalogOpExecutor {
   }
 
   /**
+   * Conveniance function to call applyAlterTable(3) with default arguments.
+   */
+  private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl)
+      throws ImpalaRuntimeException {
+    applyAlterTable(msTbl, true, null);
+  }
+
+  /**
    * Applies an ALTER TABLE command to the metastore table.
    * Note: The metastore interface is not very safe because it only accepts
    * an entire metastore.api.Table object rather than a delta of what to change. This
@@ -3323,7 +3375,8 @@ public class CatalogOpExecutor {
    * call.
    */
   private void applyAlterTable(org.apache.hadoop.hive.metastore.api.Table msTbl,
-      boolean overwriteLastDdlTime) throws ImpalaRuntimeException {
+      boolean overwriteLastDdlTime, MetastoreShim.TblTransaction tblTxn)
+      throws ImpalaRuntimeException {
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       if (overwriteLastDdlTime) {
         // It would be enough to remove this table property, as HMS would fill it, but
@@ -3331,16 +3384,25 @@ public class CatalogOpExecutor {
         // remain consistent with HMS.
         Table.updateTimestampProperty(msTbl, Table.TBL_PROP_LAST_DDL_TIME);
       }
+
       // Avoid computing/setting stats on the HMS side because that may reset the
       // 'numRows' table property (see HIVE-15653). The DO_NOT_UPDATE_STATS flag
       // tells the HMS not to recompute/reset any statistics on its own. Any
       // stats-related alterations passed in the RPC will still be applied.
       msTbl.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
-      msClient.getHiveClient().alter_table(
-          msTbl.getDbName(), msTbl.getTableName(), msTbl);
-    } catch (TException e) {
-      throw new ImpalaRuntimeException(
-          String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
+
+      if (tblTxn != null) {
+        MetastoreShim.alterTableWithTransaction(msClient.getHiveClient(), msTbl, tblTxn);
+      } else {
+        try {
+          msClient.getHiveClient().alter_table(
+              msTbl.getDbName(), msTbl.getTableName(), msTbl);
+        }
+        catch (TException e) {
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), e);
+        }
+      }
     }
   }
 
@@ -3487,7 +3549,8 @@ public class CatalogOpExecutor {
    * reduces the time spent in a single update and helps avoid metastore client
    * timeouts.
    */
-  private void bulkAlterPartitions(Table tbl, List<HdfsPartition> modifiedParts)
+  private void bulkAlterPartitions(Table tbl, List<HdfsPartition> modifiedParts,
+      MetastoreShim.TblTransaction tblTxn)
       throws ImpalaException {
     List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitions =
         Lists.newArrayList();
@@ -3508,8 +3571,14 @@ public class CatalogOpExecutor {
         Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
         try {
           // Alter partitions in bulk.
-          MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, tableName,
-              hmsPartitionsSubList);
+          if (tblTxn != null) {
+            MetastoreShim.alterPartitionsWithTransaction(msClient.getHiveClient(), dbName,
+                tableName, hmsPartitionsSubList, tblTxn);
+          }
+          else {
+            MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, tableName,
+                hmsPartitionsSubList);
+          }
           // Mark the corresponding HdfsPartition objects as dirty
           for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
               hmsPartitionsSubList) {
@@ -3739,9 +3808,18 @@ public class CatalogOpExecutor {
         = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
 
     long transactionId = -1;
-    if (update.isSetTransaction_id()) transactionId = update.getTransaction_id();
-    long tableWriteId = -1;
-    boolean isAcid = false;
+    MetastoreShim.TblTransaction tblTxn = null;
+    if (update.isSetTransaction_id()) {
+      transactionId = update.getTransaction_id();
+      Preconditions.checkState(transactionId > 0);
+      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
+         // Setup transactional parameters needed to do alter table/partitions later.
+         // TODO: Could be optimized to possibly save some RPCs, as these parameters are
+         //       not always needed + the writeId of the INSERT could be probably reused.
+         tblTxn = MetastoreShim.createTblTransaction(
+             msClient.getHiveClient(), table.getMetaStoreTable(), transactionId);
+      }
+    }
 
     try {
       // Get new catalog version for table in insert.
@@ -3776,14 +3854,6 @@ public class CatalogOpExecutor {
         HashSet<String> partsToCreate =
             Sets.newHashSet(update.getCreated_partitions());
         partsToLoadMetadata = Sets.newHashSet(partsToCreate);
-        if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
-          isAcid = true;
-          try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-            tableWriteId = MetastoreShim.allocateTableWriteId(
-                msClient.getHiveClient(), transactionId,
-                table.getDb().getName(), table.getName());
-          }
-        }
         for (FeFsPartition partition: parts) {
           // TODO: In the BE we build partition names without a trailing char. In FE
           // we build partition name with a trailing char. We should make this
@@ -3800,9 +3870,6 @@ public class CatalogOpExecutor {
               org.apache.hadoop.hive.metastore.api.Partition hmsPartition =
                   ((HdfsPartition) partition).toHmsPartition();
               hmsPartition.getParameters().remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-              if (isAcid) {
-                MetastoreShim.setWriteIdForMSPartition(hmsPartition, tableWriteId);
-              }
               hmsPartitionsStatsUnset.add(hmsPartition);
             }
             if (partition.isMarkedCached()) {
@@ -3903,7 +3970,7 @@ public class CatalogOpExecutor {
         // Unset COLUMN_STATS_ACCURATE by calling alter partition to hms.
         if (!hmsPartitionsStatsUnset.isEmpty()) {
           unsetPartitionsColStats(table.getMetaStoreTable(), hmsPartitionsStatsUnset,
-              tableWriteId, transactionId);
+              tblTxn);
         }
       } else {
         // For non-partitioned table, only single part exists
@@ -3911,7 +3978,7 @@ public class CatalogOpExecutor {
         affectedExistingPartitions.add(singlePart);
 
       }
-      unsetTableColStats(table.getMetaStoreTable(), transactionId);
+      unsetTableColStats(table.getMetaStoreTable(), tblTxn);
       // Submit the watch request for the given cache directives.
       if (!cacheDirIds.isEmpty()) {
         catalog_.watchCacheDirs(cacheDirIds, tblName.toThrift(),
@@ -4216,7 +4283,7 @@ public class CatalogOpExecutor {
       } else {
         msTbl.getParameters().put("comment", comment);
       }
-      applyAlterTable(msTbl, true);
+      applyAlterTable(msTbl);
       loadTableMetadata(tbl, newCatalogVersion, false, false, null, "ALTER COMMENT");
       addTableToCatalogUpdate(tbl, response.result);
       addSummary(response, String.format("Updated %s.", (isView) ? "view" : "table"));
@@ -4249,7 +4316,7 @@ public class CatalogOpExecutor {
                 "Column name %s not found in table %s.", columnName, tbl.getFullName()));
           }
         }
-        applyAlterTable(msTbl, true);
+        applyAlterTable(msTbl);
       }
       loadTableMetadata(tbl, newCatalogVersion, false, true, null,
           "ALTER COLUMN COMMENT");
@@ -4303,28 +4370,11 @@ public class CatalogOpExecutor {
    * Update table properties to remove the COLUMN_STATS_ACCURATE entry if it exists.
    */
   private void unsetTableColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
-      long txnId) throws ImpalaRuntimeException{
+      MetastoreShim.TblTransaction tblTxn) throws ImpalaRuntimeException{
     Map<String, String> params = msTable.getParameters();
     if (params != null && params.containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE)) {
       params.remove(StatsSetupConst.COLUMN_STATS_ACCURATE);
-      // In Hive 2, some alter table can drop stats, see HIVE-15653, set following
-      // property to true to avoid this happen.
-      // TODO: More research, and remove this property if Hive 3 fixed the problem.
-      msTable.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
-      try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-        try {
-          if (AcidUtils.isTransactionalTable(params)) {
-            MetastoreShim.alterTableWithTransaction(msClient.getHiveClient(),
-                msTable, txnId);
-          } else {
-            msClient.getHiveClient().alter_table(msTable.getDbName(),
-                msTable.getTableName(), msTable);
-          }
-        } catch (TException te) {
-          new ImpalaRuntimeException(
-              String.format(HMS_RPC_ERROR_FORMAT_STR, "alter_table"), te);
-        }
-      }
+      applyAlterTable(msTable, false, tblTxn);
     }
   }
 
@@ -4335,13 +4385,13 @@ public class CatalogOpExecutor {
    */
   private void unsetPartitionsColStats(org.apache.hadoop.hive.metastore.api.Table msTable,
       List<org.apache.hadoop.hive.metastore.api.Partition> hmsPartitionsStatsUnset,
-      long writeId, long txnId) throws ImpalaRuntimeException{
+      MetastoreShim.TblTransaction tblTxn) throws ImpalaRuntimeException{
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       try {
-        if (AcidUtils.isTransactionalTable( msTable.getParameters())) {
+        if (tblTxn != null) {
           MetastoreShim.alterPartitionsWithTransaction(
               msClient.getHiveClient(), msTable.getDbName(), msTable.getTableName(),
-              hmsPartitionsStatsUnset,  writeId, txnId);
+              hmsPartitionsStatsUnset,  tblTxn);
         } else {
           MetastoreShim.alterPartitions(msClient.getHiveClient(), msTable.getDbName(),
               msTable.getTableName(), hmsPartitionsStatsUnset);
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index e09164b..dbd51e7 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1670,7 +1670,7 @@ public class Frontend {
   private long openTransaction(TQueryCtx queryCtx) throws TransactionException {
     try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
       IMetaStoreClient hmsClient = client.getHiveClient();
-      long transactionId = MetastoreShim.openTransaction(hmsClient, "Impala");
+      long transactionId = MetastoreShim.openTransaction(hmsClient);
       HeartbeatContext ctx = new HeartbeatContext(queryCtx, System.nanoTime());
       transactionKeepalive_.addTransaction(transactionId, ctx);
       return transactionId;
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index 85a977f..029f28d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -571,9 +571,7 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalysisError(
         "compute stats functional_orc_def.full_transactional_table",
         errorMsg);
-    AnalysisError(
-        "compute stats functional.insert_only_transactional_table",
-        insertOnlyErrorMsg);
+    AnalyzesOk("compute stats functional.insert_only_transactional_table");
 
     AnalysisError(
         "select * from functional_orc_def.full_transactional_table",
@@ -603,7 +601,8 @@ public class AnalyzerTest extends FrontendTestBase {
     AnalysisError(
         "drop stats functional_orc_def.full_transactional_table",
         errorMsg);
-    AnalyzesOk("drop stats functional.insert_only_transactional_table");
+    AnalysisError("drop stats functional.insert_only_transactional_table",
+        insertOnlyErrorMsg);
 
     AnalyzesOk("describe functional.insert_only_transactional_table");
     AnalyzesOk("describe functional_orc_def.full_transactional_table");
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-compute-stats.test b/testdata/workloads/functional-query/queries/QueryTest/acid-compute-stats.test
new file mode 100644
index 0000000..0740d24
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-compute-stats.test
@@ -0,0 +1,105 @@
+====
+---- QUERY
+# Test unpartitioned table.
+set DEFAULT_TRANSACTIONAL_TYPE=insert_only;
+create table tt (i int);
+insert into tt values (1);
+compute stats tt;
+====
+---- QUERY
+show table stats tt;
+---- LABELS
+#ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+1,1,'2B','NOT CACHED','NOT CACHED','TEXT','false',regex:.*
+---- TYPES
+BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+show column stats tt;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
+---- RESULTS
+'i','INT',1,0,4,4
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
+====
+---- QUERY
+# Test partitioned table with non-incremental stats.
+set DEFAULT_TRANSACTIONAL_TYPE=insert_only;
+create table pt (x int) partitioned by (p int);
+insert into pt partition (p=1) values (1);
+compute stats pt;
+====
+---- QUERY
+show table stats pt;
+---- LABELS
+p, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'1',1,1,'2B','NOT CACHED','NOT CACHED',regex:.*,'false',regex:.*
+'Total',1,1,'2B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+show column stats pt;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
+---- RESULTS
+'x','INT',1,0,4,4
+'p','INT',1,0,4,4
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
+====
+---- QUERY
+show partitions pt
+---- LABELS
+p, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'1',1,1,'2B','NOT CACHED','NOT CACHED',regex:.*,'false',regex:.*
+'Total',1,1,'2B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Test partitioned table with incremental stats.
+# DROP STATS is currently not supported for ACID tables, so the tables is dropped and
+# recreated instead.
+set DEFAULT_TRANSACTIONAL_TYPE=insert_only;
+drop table pt;
+create table pt (x int) partitioned by (p int);
+insert into pt partition (p=1) values (1);
+compute incremental stats pt;
+====
+---- QUERY
+show table stats pt;
+---- LABELS
+p, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'1',1,1,'2B','NOT CACHED','NOT CACHED',regex:.*,'true',regex:.*
+'Total',1,1,'2B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+show column stats pt;
+---- LABELS
+COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
+---- RESULTS
+'x','INT',1,0,4,4
+'p','INT',1,0,4,4
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
+====
+---- QUERY
+show partitions pt
+---- LABELS
+p, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION
+---- RESULTS
+'1',1,1,'2B','NOT CACHED','NOT CACHED',regex:.*,'true',regex:.*
+'Total',1,1,'2B','0B','','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
+
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
index 655d2fc..826622b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -5,7 +5,7 @@ alter table functional.insert_only_transactional_table change column x y bigint;
 AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
 ====
 ---- QUERY
-compute stats functional.insert_only_transactional_table;
+drop stats functional.insert_only_transactional_table;
 ---- CATCH
 AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
 ====
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index e944f55..db42693 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -117,6 +117,15 @@ class TestAcid(ImpalaTestSuite):
         .format(unique_database, "ext_part_colstats"))
     assert "2" in result
 
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_acid_compute_stats(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-compute-stats', vector, use_db=unique_database)
+
 #  TODO(todd): further tests to write:
 #  TRUNCATE, once HIVE-20137 is implemented.
 #  INSERT OVERWRITE with empty result set, once HIVE-21750 is fixed.