You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/04/02 13:41:43 UTC

[impala] branch master updated (f2837e9 -> 8aa0652)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from f2837e9  IMPALA-4080 [part 7]: Codegen once per fragment
     new 7fa43ee  IMPALA-9483 Add logs for debugging builtin functions throw unknown exception randomly
     new 7abbde3  IMPALA-7138: detect devicemapper volumes correctly
     new 8aa0652  IMPALA-9484: Full ACID Milestone 1: properly scan files that has full ACID schema

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/logging.h                            |  16 +
 be/src/exec/hdfs-orc-scanner.cc                    |  14 +-
 be/src/exec/orc-metadata-utils.cc                  |  59 +++-
 be/src/exec/orc-metadata-utils.h                   |  10 +-
 be/src/runtime/descriptors.cc                      |   2 +
 be/src/runtime/descriptors.h                       |   3 +
 be/src/util/disk-info.cc                           |  45 ++-
 be/src/util/filesystem-util-test.cc                |  26 ++
 be/src/util/filesystem-util.cc                     |  17 +
 be/src/util/filesystem-util.h                      |   3 +
 common/thrift/CatalogObjects.thrift                |   3 +
 .../org/apache/impala/compat/MetastoreShim.java    |   2 +
 .../analysis/AlterTableAddPartitionStmt.java       |   4 +
 .../impala/analysis/AlterTableSortByStmt.java      |   4 +
 .../org/apache/impala/analysis/AlterTableStmt.java |  14 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  33 +-
 .../apache/impala/analysis/ComputeStatsStmt.java   |   3 -
 .../impala/analysis/CreateTableLikeStmt.java       |   1 -
 .../impala/analysis/DropTableOrViewStmt.java       |   1 -
 .../apache/impala/analysis/FunctionCallExpr.java   |   3 +-
 .../org/apache/impala/analysis/InsertStmt.java     |   3 +-
 .../main/java/org/apache/impala/analysis/Path.java |  37 +++
 .../java/org/apache/impala/analysis/TableDef.java  |   2 -
 .../org/apache/impala/analysis/ToSqlUtils.java     |  11 +-
 .../org/apache/impala/analysis/TruncateStmt.java   |   3 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |   4 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  33 +-
 .../main/java/org/apache/impala/catalog/Table.java |   6 +
 .../impala/catalog/local/DirectMetaProvider.java   |   7 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   9 +-
 .../java/org/apache/impala/util/AcidUtils.java     |  49 ++-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   5 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |  55 ++--
 .../impala/catalog/FileMetadataLoaderTest.java     |  13 +-
 .../java/org/apache/impala/util/AcidUtilsTest.java | 124 +++++++-
 testdata/bin/generate-schema-statements.py         |  16 +-
 testdata/datasets/README                           |   1 +
 .../functional/functional_schema_template.sql      |  28 +-
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/PlannerTest/resource-requirements.test |  24 +-
 .../queries/DataErrorsTest/orc-type-checks.test    |  18 +-
 .../queries/QueryTest/acid-negative.test           |  98 +++++-
 .../functional-query/queries/QueryTest/acid.test   |  18 ++
 .../QueryTest/create-table-like-file-orc.test      |  82 ++++-
 .../queries/QueryTest/describe-path.test           |  51 +++
 .../queries/QueryTest/full-acid-rowid.test         | 137 ++++++++
 .../queries/QueryTest/ranger_column_masking.test   | 353 --------------------
 .../ranger_column_masking_complex_types.test       | 354 +++++++++++++++++++++
 .../QueryTest/show-create-table-full-acid.test     |  98 ++++++
 tests/authorization/test_ranger.py                 |  33 +-
 tests/common/skip.py                               |   2 +
 tests/metadata/test_ddl.py                         |   3 +-
 tests/metadata/test_show_create_table.py           |   8 +-
 tests/query_test/test_acid.py                      |   9 +
 tests/query_test/test_mt_dop.py                    |   9 +
 tests/query_test/test_nested_types.py              |  43 ++-
 tests/query_test/test_scanners.py                  |  91 +++++-
 tests/query_test/test_scanners_fuzz.py             |  78 +++--
 58 files changed, 1630 insertions(+), 549 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_complex_types.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/show-create-table-full-acid.test


[impala] 02/03: IMPALA-7138: detect devicemapper volumes correctly

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7abbde3c5d18217a84979454fffad1887e10816a
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Apr 1 11:40:24 2020 -0700

    IMPALA-7138: detect devicemapper volumes correctly
    
    This is a more general fix for device detection that
    disables the digit-stripping heuristic if a device
    with the same name as a partition already exists.
    
    E.g. for device-mapper devices, /proc/partitions might
    include a partition called "dm-0" that maps directly
    to the block device "dm-0" (i.e. /sys/block/dm-0).
    Before this fix, we looked for a device called "dm-",
    which did not exist.
    
    Testing:
    Created a dummy device-mapper device with the following command:
    
      sudo dmsetup create 1gb-zero --table '0 1953125 zero'
    
    Then confirmed that the device name in the logs was correct
    
        dm-0 (rotational=true)
    
    Other devices - sda, sda1, sda2, sdb, sdb1, etc on my system
    were still detected the same as before.
    
    Change-Id: I9d231bcf9105db8d4ab03586eab74e0644337a6f
    Reviewed-on: http://gerrit.cloudera.org:8080/15631
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/disk-info.cc            | 45 ++++++++++++++++++++++++-------------
 be/src/util/filesystem-util-test.cc | 26 +++++++++++++++++++++
 be/src/util/filesystem-util.cc      | 17 ++++++++++++++
 be/src/util/filesystem-util.h       |  3 +++
 4 files changed, 75 insertions(+), 16 deletions(-)

diff --git a/be/src/util/disk-info.cc b/be/src/util/disk-info.cc
index 6b55127..34159d7 100644
--- a/be/src/util/disk-info.cc
+++ b/be/src/util/disk-info.cc
@@ -31,6 +31,7 @@
 #include <boost/algorithm/string/trim.hpp>
 
 #include "gutil/strings/substitute.h"
+#include "util/filesystem-util.h"
 
 #include "common/names.h"
 
@@ -80,19 +81,28 @@ void DiskInfo::GetDeviceNames() {
     vector<string> fields;
     split(fields, line, is_any_of(" "), token_compress_on);
     if (fields.size() != 4) continue;
-    string name = fields[3];
-    if (name == "name") continue;
-
-    // NVME devices have a special format. Try to detect that, falling back to the normal
-    // method if this is not an NVME device.
-    std::string nvme_basename;
-    if (TryNVMETrim(name, &nvme_basename)) {
-      // This is an NVME device, use the returned basename
-      name = nvme_basename;
-    } else {
-      // Does not follow the NVME pattern, so use the logic for a normal disk device
-      // Remove the partition# from the name.  e.g. sda2 --> sda
-      trim_right_if(name, is_any_of("0123456789"));
+    const string& partition_name = fields[3];
+    if (partition_name == "name") continue;
+
+    // Check if this is the top-level block device. If not, try to guess the
+    // name of the top-level block device.
+    bool found_device = false;
+    string dev_name = partition_name;
+    Status status =
+      FileSystemUtil::PathExists(Substitute("/sys/block/$0", dev_name), &found_device);
+    if (!status.ok()) LOG(WARNING) << status.GetDetail();
+    if (!found_device) {
+      // NVME devices have a special format. Try to detect that, falling back to the normal
+      // method if this is not an NVME device.
+      std::string nvme_basename;
+      if (TryNVMETrim(dev_name, &nvme_basename)) {
+        // This is an NVME device, use the returned basename
+        dev_name = nvme_basename;
+      } else {
+        // Does not follow the NVME pattern, so use the logic for a normal disk device
+        // Remove the partition# from the name.  e.g. sda2 --> sda
+        trim_right_if(dev_name, is_any_of("0123456789"));
+      }
     }
 
     // Create a mapping of all device ids (one per partition) to the disk id.
@@ -102,12 +112,12 @@ void DiskInfo::GetDeviceNames() {
     DCHECK(device_id_to_disk_id_.find(dev) == device_id_to_disk_id_.end());
 
     int disk_id = -1;
-    map<string, int>::iterator it = disk_name_to_disk_id_.find(name);
+    map<string, int>::iterator it = disk_name_to_disk_id_.find(dev_name);
     if (it == disk_name_to_disk_id_.end()) {
       // First time seeing this disk
       disk_id = disks_.size();
-      disks_.push_back(Disk(name, disk_id));
-      disk_name_to_disk_id_[name] = disk_id;
+      disks_.push_back(Disk(dev_name, disk_id));
+      disk_name_to_disk_id_[dev_name] = disk_id;
     } else {
       disk_id = it->second;
     }
@@ -135,6 +145,9 @@ void DiskInfo::GetDeviceNames() {
       string line;
       getline(rotational, line);
       if (line == "0") disks_[i].is_rotational = false;
+    } else {
+      LOG(INFO) << "Could not read " << ss.str() << " for " << disks_[i].name
+                << " , assuming rotational.";
     }
     if (rotational.is_open()) rotational.close();
   }
diff --git a/be/src/util/filesystem-util-test.cc b/be/src/util/filesystem-util-test.cc
index c8e744e..a8c4fad 100644
--- a/be/src/util/filesystem-util-test.cc
+++ b/be/src/util/filesystem-util-test.cc
@@ -174,3 +174,29 @@ TEST(FilesystemUtil, DirEntryTypes) {
   ASSERT_EQ(entries.size(), 1);
   EXPECT_TRUE(entries[0] == "impala-file");
 }
+
+TEST(FilesystemUtil, PathExists) {
+  path dir = filesystem::unique_path();
+
+  // Paths to existent and non-existent dirs.
+  path subdir1 = dir / "impala1";
+  path subdir2 = dir / "impala2";
+  filesystem::create_directories(subdir1);
+  bool exists;
+  EXPECT_OK(FileSystemUtil::PathExists(subdir1.string(), &exists));
+  EXPECT_TRUE(exists);
+  EXPECT_OK(FileSystemUtil::PathExists(subdir2.string(), &exists));
+  EXPECT_FALSE(exists);
+
+  // Paths to existent and non-existent file.
+  path file1 = dir / "a_file1";
+  path file2 = dir / "a_file2";
+  ASSERT_OK(FileSystemUtil::CreateFile(file1.string()));
+  EXPECT_OK(FileSystemUtil::PathExists(file1.string(), &exists));
+  EXPECT_TRUE(exists);
+  EXPECT_OK(FileSystemUtil::PathExists(file2.string(), &exists));
+  EXPECT_FALSE(exists);
+
+  // Cleanup
+  filesystem::remove_all(dir);
+}
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index ea0a10a..5ec1755 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -173,6 +173,23 @@ Status FileSystemUtil::VerifyIsDirectory(const string& directory_path) {
   return Status::OK();
 }
 
+Status FileSystemUtil::PathExists(const std::string& path, bool* exists) {
+  error_code errcode;
+  *exists = filesystem::exists(path, errcode);
+  if (errcode != errc::success) {
+    // Need to check for no_such_file_or_directory error case - Boost's exists() sometimes
+    // returns an error when it should simply return false.
+    if (errcode == errc::no_such_file_or_directory) {
+      *exists = false;
+      return Status::OK();
+    }
+    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(
+        "Encountered exception while checking existence of path $0: $1",
+        path, errcode.message())));
+  }
+  return Status::OK();
+}
+
 Status FileSystemUtil::GetSpaceAvailable(const string& directory_path,
     uint64_t* available_bytes) {
   error_code errcode;
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 5471baa..5383d4d 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -45,6 +45,9 @@ class FileSystemUtil {
   /// Returns Status::OK if it is, or a runtime error with a message otherwise.
   static Status VerifyIsDirectory(const std::string& directory_path) WARN_UNUSED_RESULT;
 
+  /// Check if a path exists. Returns an error if this could not be determined.
+  static Status PathExists(const std::string& path, bool* exists);
+
   /// Returns the space available on the file system containing 'directory_path'
   /// in 'available_bytes'
   static Status GetSpaceAvailable(


[impala] 01/03: IMPALA-9483 Add logs for debugging builtin functions throw unknown exception randomly

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7fa43eef80bcb7823f144690bd1197276907b710
Author: xiaomeng <xi...@cloudera.com>
AuthorDate: Mon Mar 30 15:27:40 2020 -0700

    IMPALA-9483 Add logs for debugging builtin functions throw unknown exception randomly
    
    In secure env with high concurrency, queries that call builtin function
    randomly fail when trying to find the function. For example,
    "AnalysisException: trim() unknown".
    Adding more info in exception message to help debugging when it happens
    again.
    
    Change-Id: I30d6eb697695da8d2521acb76d8310ec8f1bbda9
    Reviewed-on: http://gerrit.cloudera.org:8080/15607
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 1566775..1fea640 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -490,7 +490,8 @@ public class FunctionCallExpr extends Expr {
     // User needs DB access.
     FeDb db = analyzer.getDb(fnName_.getDb(), Privilege.VIEW_METADATA, true);
     if (!db.containsFunction(fnName_.getFunction())) {
-      throw new AnalysisException(fnName_ + "() unknown");
+      throw new AnalysisException(fnName_ + "() unknown for database " + db.getName()
+          + ". Currently this db has " + db.numFunctions() + " functions.");
     }
 
     if (isBuiltinCastFunction()) {


[impala] 03/03: IMPALA-9484: Full ACID Milestone 1: properly scan files that has full ACID schema

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8aa0652871c64639a34e54a7339a1eff1d594b19
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Feb 24 13:22:02 2020 +0100

    IMPALA-9484: Full ACID Milestone 1: properly scan files that has full ACID schema
    
    Full ACID row format looks like this:
    
    {
      "operation": 0,
      "originalTransaction": 1,
      "bucket": 536870912,
      "rowId": 0,
      "currentTransaction": 1,
      "row": {"i": 1}
    }
    
    User columns are nested under "row". In the frontend we need to create
    slot descriptors that correspond to the file schema. In the catalog we
    could mimic the file schema but that would introduce several
    complexities and corner cases in column resolution. Also in query
    results the heading of the above user column would be "row.i". Star
    expansion should also be modified, etc.
    
    Because of that in the Catalog I create the exact opposite of the above
    schema:
    
    {
      "row__id":
      {
        "operation": 0,
        "originalTransaction": 1,
        "bucket": 536870912,
        "rowId": 0,
        "currentTransaction": 1
      }
      "i": 1
    }
    
    This way very little modification is needed in the frontend. And the
    hidden columns can be easily retrieved via 'SELECT row__id.*' when we
    need those for debugging/testing.
    
    We only need to change Path.getAbsolutePath() to return a schema path
    that corresponds to the file schema. Also in the backend we need some
    extra juggling in OrcSchemaResolver::ResolveColumn() to retrieve the
    table schema path from the file schema path.
    
    Testing:
    I changed data loading to load ORC files in full ACID format by default.
    With this change we should be able to scan full ACID tables that are
    not minor-compacted, don't have deleted rows, and don't have original
    files.
    
    Newly added Tests:
     * specific queries about hidden columns (full-acid-rowid.test)
     * SHOW CREATE TABLE (show-create-table-full-acid.test)
     * DESCRIBE [FORMATTED] TABLE (describe-path.test)
     * INSERT should be forbidden (acid-negative.test)
     * added tests for column masking (
       ranger_column_masking_complex_types.test)
    
    Change-Id: Ic2e2afec00c9a5cf87f1d61b5fe52b0085844bcb
    Reviewed-on: http://gerrit.cloudera.org:8080/15395
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/logging.h                            |  16 +
 be/src/exec/hdfs-orc-scanner.cc                    |  14 +-
 be/src/exec/orc-metadata-utils.cc                  |  59 +++-
 be/src/exec/orc-metadata-utils.h                   |  10 +-
 be/src/runtime/descriptors.cc                      |   2 +
 be/src/runtime/descriptors.h                       |   3 +
 common/thrift/CatalogObjects.thrift                |   3 +
 .../org/apache/impala/compat/MetastoreShim.java    |   2 +
 .../analysis/AlterTableAddPartitionStmt.java       |   4 +
 .../impala/analysis/AlterTableSortByStmt.java      |   4 +
 .../org/apache/impala/analysis/AlterTableStmt.java |  14 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  33 +-
 .../apache/impala/analysis/ComputeStatsStmt.java   |   3 -
 .../impala/analysis/CreateTableLikeStmt.java       |   1 -
 .../impala/analysis/DropTableOrViewStmt.java       |   1 -
 .../org/apache/impala/analysis/InsertStmt.java     |   3 +-
 .../main/java/org/apache/impala/analysis/Path.java |  37 +++
 .../java/org/apache/impala/analysis/TableDef.java  |   2 -
 .../org/apache/impala/analysis/ToSqlUtils.java     |  11 +-
 .../org/apache/impala/analysis/TruncateStmt.java   |   3 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |   4 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  33 +-
 .../main/java/org/apache/impala/catalog/Table.java |   6 +
 .../impala/catalog/local/DirectMetaProvider.java   |   7 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   9 +-
 .../java/org/apache/impala/util/AcidUtils.java     |  49 ++-
 .../org/apache/impala/analysis/AnalyzeDDLTest.java |   5 +-
 .../org/apache/impala/analysis/AnalyzerTest.java   |  55 ++--
 .../impala/catalog/FileMetadataLoaderTest.java     |  13 +-
 .../java/org/apache/impala/util/AcidUtilsTest.java | 124 +++++++-
 testdata/bin/generate-schema-statements.py         |  16 +-
 testdata/datasets/README                           |   1 +
 .../functional/functional_schema_template.sql      |  28 +-
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/PlannerTest/resource-requirements.test |  24 +-
 .../queries/DataErrorsTest/orc-type-checks.test    |  18 +-
 .../queries/QueryTest/acid-negative.test           |  98 +++++-
 .../functional-query/queries/QueryTest/acid.test   |  18 ++
 .../QueryTest/create-table-like-file-orc.test      |  82 ++++-
 .../queries/QueryTest/describe-path.test           |  51 +++
 .../queries/QueryTest/full-acid-rowid.test         | 137 ++++++++
 .../queries/QueryTest/ranger_column_masking.test   | 353 --------------------
 .../ranger_column_masking_complex_types.test       | 354 +++++++++++++++++++++
 .../QueryTest/show-create-table-full-acid.test     |  98 ++++++
 tests/authorization/test_ranger.py                 |  33 +-
 tests/common/skip.py                               |   2 +
 tests/metadata/test_ddl.py                         |   3 +-
 tests/metadata/test_show_create_table.py           |   8 +-
 tests/query_test/test_acid.py                      |   9 +
 tests/query_test/test_mt_dop.py                    |   9 +
 tests/query_test/test_nested_types.py              |  43 ++-
 tests/query_test/test_scanners.py                  |  91 +++++-
 tests/query_test/test_scanners_fuzz.py             |  78 +++--
 53 files changed, 1553 insertions(+), 532 deletions(-)

diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index fd93e49..4f7fc9d 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -68,6 +68,22 @@
   DCHECK(a == b) << "[ " #a " = " << static_cast<int>(a) << " , " #b " = " \
                  << static_cast<int>(b) << " ]"
 
+#ifndef KUDU_HEADERS_USE_SHORT_STATUS_MACROS
+/// Define DCHECK_OK that evaluates an expression that has type 'Status' and checks
+/// that the returning status is OK. If not OK, it logs the error and aborts the process.
+/// In release builds the given expression is not evaluated.
+#  ifndef NDEBUG
+#    define DCHECK_OK(status)                \
+       do {                                  \
+         const Status& _s = (status);        \
+         DCHECK(_s.ok()) << _s.GetDetail();  \
+       } while (0)
+#  else
+     // Let's define it to '{}' in case it's used in single line if statements.
+#    define DCHECK_OK(status) {}
+#  endif // NDEBUG
+#endif   // KUDU_HEADERS_USE_SHORT_STATUS_MACROS
+
 /// Define Kudu logging macros to use glog macros.
 #define KUDU_LOG              LOG
 #define KUDU_CHECK            CHECK
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 3308818..05e1198 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -40,6 +40,8 @@ using namespace impala::io;
 DEFINE_bool(enable_orc_scanner, true,
     "If false, reading from ORC format tables is not supported");
 
+const string HIVE_ACID_VERSION_KEY = "hive.acid.version";
+
 Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
   DCHECK(!files.empty());
@@ -189,8 +191,18 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
   context_->ReleaseCompletedResources(true);
   RETURN_IF_ERROR(footer_status);
 
+  bool is_table_full_acid = scan_node_->hdfs_table()->IsTableFullAcid();
+  bool is_file_full_acid = reader_->hasMetadataValue(HIVE_ACID_VERSION_KEY) &&
+                           reader_->getMetadataValue(HIVE_ACID_VERSION_KEY) == "2";
+  // TODO: Remove the following constraint once IMPALA-9515 is resolved.
+  if (is_table_full_acid && !is_file_full_acid) {
+    return Status(Substitute("Error: Table is in full ACID format, but "
+        "'hive.acid.version' = '2' is missing from file metadata: table=$0, file=$1",
+        scan_node_->hdfs_table()->name(), filename()));
+  }
   schema_resolver_.reset(new OrcSchemaResolver(*scan_node_->hdfs_table(),
-      &reader_->getType(), filename()));
+      &reader_->getType(), filename(), is_table_full_acid, is_file_full_acid));
+  RETURN_IF_ERROR(schema_resolver_->ValidateFullAcidFileSchema());
 
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
   // columns we don't need.
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index 77c11fe..03433db 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -84,14 +84,44 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
   *node = root_;
   *pos_field = false;
   *missing_field = false;
+  DCHECK(ValidateFullAcidFileSchema().ok()); // Should have already been validated.
+  bool translate_acid_path = is_table_full_acid_ && is_file_full_acid_;
+  int num_part_cols = tbl_desc_.num_clustering_cols();
   for (int i = 0; i < col_path.size(); ++i) {
     int table_idx = col_path[i];
     int file_idx = table_idx;
     if (i == 0) {
-      table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+      if (translate_acid_path) {
+        constexpr int FILE_INDEX_OF_FIELD_ROW = 5;
+        if (table_idx == num_part_cols + FILE_INDEX_OF_FIELD_ROW) {
+          // Refers to "row" column. Table definition doesn't have "row" column
+          // so here we just step into the file's "row" column to get in sync
+          // with the table schema.
+          *node = (*node)->getSubtype(FILE_INDEX_OF_FIELD_ROW);
+          continue;
+        }
+        DCHECK_GE(table_idx, num_part_cols);
+        // 'col_path' refers to the ACID columns. In table schema they are nested
+        // under the synthetic 'row__id' column. 'row__id' is at index 'num_part_cols'.
+        table_col_type = &tbl_desc_.col_descs()[num_part_cols].type();
+        // The ACID column is under 'row__id' at index 'table_idx - num_part_cols'.
+        int acid_col_idx = table_idx - num_part_cols;
+        DCHECK_GE(acid_col_idx, 0);
+        DCHECK_LT(acid_col_idx, table_col_type->children.size());
+        table_col_type = &table_col_type->children[acid_col_idx];
+      } else {
+        table_col_type = &tbl_desc_.col_descs()[table_idx].type();
+      }
       // For top-level columns, the first index in a path includes the table's partition
       // keys.
-      file_idx -= tbl_desc_.num_clustering_cols();
+      file_idx -= num_part_cols;
+    } else if (i == 1 && table_col_type == nullptr && translate_acid_path) {
+      // Here we are referring to a table column from the viewpoint of the user.
+      // Hence, in the table metadata this is a top-level column, i.e. it is offsetted
+      // with 'num_part_cols' in the table schema. We also need to add '1', because in the
+      // FeTable we added a synthetic struct typed column 'row__id'.
+      table_idx += 1 + num_part_cols;
+      table_col_type = &tbl_desc_.col_descs()[table_idx].type();
     } else if (table_col_type->type == TYPE_ARRAY &&
         table_idx == SchemaPathConstants::ARRAY_POS) {
       // To materialize the positions, the ORC lib has to materialize the whole array
@@ -202,4 +232,29 @@ Status OrcSchemaResolver::ValidateType(const ColumnType& type,
       "Type mismatch: table column $0 is map to column $1 in ORC file '$2'",
       type.DebugString(), orc_type.toString(), filename_));
 }
+
+Status OrcSchemaResolver::ValidateFullAcidFileSchema() const {
+  if (!is_file_full_acid_) return Status::OK();
+  string error_msg = Substitute("File %0 should have full ACID schema.", filename_);
+  if (root_->getKind() != orc::TypeKind::STRUCT) return Status(error_msg);
+  if (root_->getSubtypeCount() != 6) return Status(error_msg);
+  if (root_->getSubtype(0)->getKind() != orc::TypeKind::INT ||
+      root_->getSubtype(1)->getKind() != orc::TypeKind::LONG ||
+      root_->getSubtype(2)->getKind() != orc::TypeKind::INT ||
+      root_->getSubtype(3)->getKind() != orc::TypeKind::LONG ||
+      root_->getSubtype(4)->getKind() != orc::TypeKind::LONG ||
+      root_->getSubtype(5)->getKind() != orc::TypeKind::STRUCT) {
+    return Status(error_msg);
+  }
+  if (root_->getFieldName(0) != "operation" ||
+      root_->getFieldName(1) != "originalTransaction" ||
+      root_->getFieldName(2) != "bucket" ||
+      root_->getFieldName(3) != "rowId" ||
+      root_->getFieldName(4) != "currentTransaction" ||
+      root_->getFieldName(5) != "row") {
+    return Status(error_msg);
+  }
+  return Status::OK();
 }
+
+} // namespace impala
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index eaab266..72ddc54 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -28,7 +28,9 @@ namespace impala {
 class OrcSchemaResolver {
  public:
   OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc, const orc::Type* root,
-      const char* filename) : tbl_desc_(tbl_desc), root_(root), filename_(filename) { }
+      const char* filename, bool is_table_acid, bool is_file_acid) : tbl_desc_(tbl_desc),
+      root_(root), filename_(filename), is_table_full_acid_(is_table_acid),
+      is_file_full_acid_(is_file_acid) { }
 
   /// Resolve SchemaPath into orc::Type (ORC column representation)
   /// 'pos_field' is set to true if 'col_path' reference the index field of an array
@@ -42,10 +44,16 @@ class OrcSchemaResolver {
   Status BuildSchemaPaths(int num_partition_keys,
       std::vector<SchemaPath>* col_id_path_map) const;
 
+  /// Returns error if the file should be in ACIDv2 format,
+  /// but the actual file schema doesn't conform to it.
+  Status ValidateFullAcidFileSchema() const;
+
  private:
   const HdfsTableDescriptor& tbl_desc_;
   const orc::Type* const root_;
   const char* const filename_ = nullptr;
+  const bool is_table_full_acid_;
+  const bool is_file_full_acid_;
 
   /// Validate whether the ColumnType is compatible with the orc type
   Status ValidateType(const ColumnType& type, const orc::Type& orc_type) const
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index b9a96fc..3b9fe84 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -235,6 +235,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
   prototype_partition_descriptor_ = pool->Add(new HdfsPartitionDescriptor(
     tdesc.hdfsTable, tdesc.hdfsTable.prototype_partition));
   avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : "";
+  is_full_acid_ = tdesc.hdfsTable.is_full_acid;
 }
 
 void HdfsTableDescriptor::ReleaseResources() {
@@ -263,6 +264,7 @@ string HdfsTableDescriptor::DebugString() const {
 
   out << " null_partition_key_value='" << null_partition_key_value_ << "'";
   out << " null_column_value='" << null_column_value_ << "'";
+  out << " is_full_acid=" << std::boolalpha << is_full_acid_;
   return out.str();
 }
 
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index c4e1dc9..67a93fe 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -329,6 +329,8 @@ class HdfsTableDescriptor : public TableDescriptor {
     return prototype_partition_descriptor_;
   }
 
+  bool IsTableFullAcid() const { return is_full_acid_; }
+
   virtual std::string DebugString() const;
 
  protected:
@@ -340,6 +342,7 @@ class HdfsTableDescriptor : public TableDescriptor {
   HdfsPartitionDescriptor* prototype_partition_descriptor_;
   /// Set to the table's Avro schema if this is an Avro table, empty string otherwise
   std::string avro_schema_;
+  bool is_full_acid_;
 };
 
 class HBaseTableDescriptor : public TableDescriptor {
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index ee06b39..b31eb5e 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -370,6 +370,9 @@ struct THdfsTable {
 
   // Primary Keys information for HDFS Tables
   11: optional SqlConstraints.TSqlConstraints sql_constraints
+
+  // True if the table is in Hive Full ACID format.
+  12: optional bool is_full_acid = false
 }
 
 struct THBaseTable {
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d027a08..43f174c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -843,6 +843,8 @@ public class MetastoreShim {
         EXTREAD,  // External table read
         HIVEMANAGEDINSERTREAD, // Insert-only table read
         HIVEMANAGEDINSERTWRITE, // Insert-only table write
+        HIVEFULLACIDREAD,
+        HIVEFULLACIDWRITE,
         HIVESQL,
         HIVEMQT,
         HIVEBUCKET2 // Includes the capability to get the correct bucket number.
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
index 24cdc46..41b0308 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
@@ -96,4 +96,8 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt {
       }
     }
   }
+
+  @Override
+  protected void checkTransactionalTable() throws AnalysisException {
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
index a5f2e54..1a07a5d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableSortByStmt.java
@@ -84,4 +84,8 @@ public class AlterTableSortByStmt extends AlterTableStmt {
 
     TableDef.analyzeSortColumns(columns_, targetTable, sortingOrder_);
   }
+
+  @Override
+  protected void checkTransactionalTable() throws AnalysisException {
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
index 79c3b64..249987e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableStmt.java
@@ -91,7 +91,15 @@ public abstract class AlterTableStmt extends StatementBase {
     Preconditions.checkState(tableRef instanceof BaseTableRef);
     table_ = tableRef.getTable();
     analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
-    analyzer.ensureTableNotTransactional(table_, "ALTER TABLE");
+    // TODO: IMPALA-8831 will enable all ALTER TABLE statements on transactional tables.
+    // Until that we call 'checkTransactionalTable()' here that throws an exception in
+    // case of transactional tables. However, AlterTableAddPartition and AlterTableSortBy
+    // overrides checkTransactionalTable() to enable those operations.
+    // We need to do that because those ALTER TABLE statements are needed for Impala
+    // testing to load the test tables.
+    // We can do that because these operations are "safe", i.e. they don't mess up the
+    // column statistics.
+    checkTransactionalTable();
     if (table_ instanceof FeDataSourceTable
         && !(this instanceof AlterTableSetColumnStats)) {
       throw new AnalysisException(String.format(
@@ -99,4 +107,8 @@ public abstract class AlterTableStmt extends StatementBase {
           tableName_));
     }
   }
+
+  protected void checkTransactionalTable() throws AnalysisException {
+    Analyzer.ensureTableNotTransactional(table_, "ALTER TABLE");
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index f748f92..f79e109 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -145,11 +145,10 @@ public class Analyzer {
       "Data source does not exist: ";
   public final static String DATA_SRC_ALREADY_EXISTS_ERROR_MSG =
       "Data source already exists: ";
-  private static final String INSERT_ONLY_ACID_TABLE_SUPPORTED_ERROR_MSG =
-      "Table %s not supported. Transactional (ACID) tables are " +
-      "only supported when they are configured as insert_only.";
   private static final String TRANSACTIONAL_TABLE_NOT_SUPPORTED =
-      "%s not supported on transactional (ACID) table: %s" ;
+      "%s not supported on transactional (ACID) table: %s";
+  private static final String FULL_TRANSACTIONAL_TABLE_NOT_SUPPORTED =
+      "%s not supported on full transactional (ACID) table: %s";
   private static final String BUCKETED_TABLE_NOT_SUPPORTED =
       "%s is a bucketed table. Only read operations are supported on such tables.";
   private static final String TABLE_NOT_SUPPORTED =
@@ -214,29 +213,17 @@ public class Analyzer {
   public void setHasWithClause() { hasWithClause_ = true; }
   public boolean hasWithClause() { return hasWithClause_; }
 
-
-  /**
-   * @param tblProperties Table properties that are used to check transactional nature
-   * @param tableName Table name to be reported in exception message
-   * @throws AnalysisException If table is full acid table.
-   */
-  public static void ensureTableNotFullAcid(Map<String, String> tblProperties,
-                                            String tableName)
-      throws AnalysisException {
-    if (AcidUtils.isFullAcidTable(tblProperties)) {
-      throw new AnalysisException(String.format(
-          INSERT_ONLY_ACID_TABLE_SUPPORTED_ERROR_MSG, tableName));
-    }
-  }
-
   /**
    * @param table Table whose properties need to be checked.
+   * @param operationStr The unsupported operation.
    * @throws AnalysisException If table is full acid table.
    */
-  public static void ensureTableNotFullAcid(FeTable table)
+  public static void ensureTableNotFullAcid(FeTable table, String operationStr)
       throws AnalysisException {
-    ensureTableNotFullAcid(table.getMetaStoreTable().getParameters(),
-        table.getFullName());
+    if (AcidUtils.isFullAcidTable(table.getMetaStoreTable().getParameters())) {
+      throw new AnalysisException(String.format(FULL_TRANSACTIONAL_TABLE_NOT_SUPPORTED,
+          operationStr, table.getFullName()));
+    }
   }
 
   public static void ensureTableNotTransactional(FeTable table, String operationStr)
@@ -295,7 +282,6 @@ public class Analyzer {
       if (KuduTable.isKuduTable(table.getMetaStoreTable())) return;
       if (!MetastoreShim.hasTableCapability(table.getMetaStoreTable(), writeRequires)) {
         // Error messages with explanations.
-        ensureTableNotFullAcid(table);
         throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Write",
             table.getFullName(),
             MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
@@ -319,7 +305,6 @@ public class Analyzer {
         // TODO: After Hive provides API calls to send back hints on why
         // the operations are not supported, we will generate error messages
         // accordingly.
-        ensureTableNotFullAcid(table);
         throw new AnalysisException(String.format(TABLE_NOT_SUPPORTED, "Operations",
             table.getFullName(),
             MetastoreShim.getTableAccessType(table.getMetaStoreTable())));
diff --git a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
index 57c4015..f3254e5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ComputeStatsStmt.java
@@ -361,9 +361,6 @@ public class ComputeStatsStmt extends StatementBase {
           "COMPUTE STATS not supported for nested collection: %s", tableName_));
     }
     table_ = analyzer.getTable(tableName_, Privilege.ALTER, Privilege.SELECT);
-    // Adding the check here instead of tableRef.analyze because tableRef is
-    // used at multiple places and will even disallow select.
-    analyzer.ensureTableNotFullAcid(table_);
 
     if (!(table_ instanceof FeFsTable)) {
       if (partitionSet_ != null) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
index cecb214..2dafe6c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeStmt.java
@@ -174,7 +174,6 @@ public class CreateTableLikeStmt extends StatementBase {
     // Make sure the source table exists and the user has permission to access it.
     FeTable srcTable = analyzer.getTable(srcTableName_, Privilege.VIEW_METADATA);
 
-    analyzer.ensureTableNotFullAcid(srcTable);
     analyzer.ensureTableNotBucketed(srcTable);
 
     if (KuduTable.isKuduTable(srcTable.getMetaStoreTable())) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
index 2d48bec..f234897 100644
--- a/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/DropTableOrViewStmt.java
@@ -138,7 +138,6 @@ public class DropTableOrViewStmt extends StatementBase {
       if (dropTable_) {
         // To drop a view needs not write capabilities, only checks for tables.
         analyzer.checkTableCapability(table, Analyzer.OperationType.WRITE);
-        analyzer.ensureTableNotFullAcid(table);
       }
     } catch (TableLoadingException e) {
       // We should still try to DROP tables that failed to load, so that tables that are
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index eaf1a7b..6d1c037 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -442,7 +442,8 @@ public class InsertStmt extends StatementBase {
               table_.getFullName()));
     }
 
-    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    Analyzer.ensureTableNotFullAcid(table_, "INSERT");
+    Analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
 
     // We do not support (in|up)serting into tables with unsupported column types.
     for (Column c: table_.getColumns()) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/Path.java b/fe/src/main/java/org/apache/impala/analysis/Path.java
index 475410e..489e9a8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Path.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Path.java
@@ -28,10 +28,12 @@ import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.util.AcidUtils;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -451,9 +453,44 @@ public class Path {
     absolutePath_ = new ArrayList<>();
     if (rootDesc_ != null) absolutePath_.addAll(rootDesc_.getPath().getAbsolutePath());
     absolutePath_.addAll(matchedPositions_);
+    // ACID table schema path differs from file schema path. Let's convert it here.
+    if (!absolutePath_.isEmpty() &&
+        // Only convert if path was already absolute. Otherwise 'matchedPositions_' is
+        // relative to a path that we have already converted.
+        matchedPositions_.size() == absolutePath_.size() &&
+        rootTable_ != null &&
+        AcidUtils.isFullAcidTable(rootTable_.getMetaStoreTable().getParameters())) {
+      convertToFullAcidFilePath();
+    }
     return absolutePath_;
   }
 
+  /**
+   * Converts table schema path to file schema path. Well, it's actually somewhere between
+   * the two because the first column is offsetted with the number of partitions.
+   */
+  private void convertToFullAcidFilePath() {
+    // For Full ACID tables we need to create a schema path that corresponds to the
+    // ACID file schema.
+    int numPartitions = rootTable_.getNumClusteringCols();
+    if (absolutePath_.get(0) == numPartitions) {
+      // The path refers to the synthetic "row__id" column.
+      Preconditions.checkState(absolutePath_.size() == 2);
+      // "row__id" is not present in the file so remove it.
+      absolutePath_.remove(0);
+      // The member of the synthetic "row__id" field is actually a top-level table col,
+      // so we need to add 'numPartitions' to its index.
+      absolutePath_.set(0, absolutePath_.get(0) + numPartitions);
+    } else if (absolutePath_.get(0) > numPartitions) {
+      // In the file user columns are embedded inside the "row" column which is
+      // the fifth column in a full ACID file.
+      absolutePath_.add(0, numPartitions + 5);
+      // Since the user column is not top-level anymore we need to subtract
+      // 'numPartitions' and 1 (the synthetic "row__id").
+      absolutePath_.set(1, absolutePath_.get(1) - numPartitions - 1);
+    }
+  }
+
   @Override
   public String toString() {
     Preconditions.checkState(rootTable_ != null || rootDesc_ != null);
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index 60ef906..71d8de4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -780,7 +780,5 @@ class TableDef {
 
     AcidUtils.setTransactionalProperties(options_.tblProperties,
           analyzer.getQueryOptions().getDefault_transactional_type());
-    // Disallow creation of full ACID table.
-    analyzer.ensureTableNotFullAcid(options_.tblProperties, fqTableName_.toString());
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index ef315b8..1c5a781 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -47,6 +47,7 @@ import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TSortingOrder;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.KuduUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -318,11 +319,17 @@ public class ToSqlUtils {
     List<String> colsSql = new ArrayList<>();
     List<String> partitionColsSql = new ArrayList<>();
     boolean isHbaseTable = table instanceof FeHBaseTable;
+    boolean isFullAcid = AcidUtils.isFullAcidTable(
+        table.getMetaStoreTable().getParameters());
     for (int i = 0; i < table.getColumns().size(); i++) {
+      Column col = table.getColumns().get(i);
       if (!isHbaseTable && i < table.getNumClusteringCols()) {
-        partitionColsSql.add(columnToSql(table.getColumns().get(i)));
+        partitionColsSql.add(columnToSql(col));
+      } else if (isFullAcid && i == table.getNumClusteringCols()) {
+        Preconditions.checkState(col.getName().equals("row__id"));
+        continue;
       } else {
-        colsSql.add(columnToSql(table.getColumns().get(i)));
+        colsSql.add(columnToSql(col));
       }
     }
     RowFormat rowFormat = RowFormat.fromStorageDescriptor(msTable.getSd());
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index 60d6bf2..9945d78 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -67,7 +67,8 @@ public class TruncateStmt extends StatementBase {
       throw new AnalysisException(String.format(
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
     }
-    analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
+    Analyzer.ensureTableNotFullAcid(table_, "TRUNCATE");
+    Analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index ac0a2c4..7bcd10f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
@@ -147,8 +148,9 @@ public class FileMetadataLoader {
    * descriptors.
    *
    * @throws IOException if listing fails.
+   * @throws MetaException on ACID errors. TODO: remove this once IMPALA-9042 is resolved.
    */
-  public void load() throws IOException {
+  public void load() throws MetaException, IOException {
     Preconditions.checkState(loadStats_ == null, "already loaded");
     loadStats_ = new LoadStats();
     FileSystem fs = partDir_.getFileSystem(CONF);
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 1e3bf65..02f8c90 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -80,6 +80,7 @@ import org.apache.impala.thrift.TSqlConstraints;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.AvroSchemaConverter;
 import org.apache.impala.util.AvroSchemaUtils;
 import org.apache.impala.util.FsPermissionCache;
@@ -491,6 +492,29 @@ public class HdfsTable extends Table implements FeFsTable {
   }
 
   /**
+   * Adds the synthetic "row__id" column to the table schema. Under "row__id" it adds
+   * the ACID hidden columns.
+   * Note that this is the exact opposite of the file schema. In an ACID file, the
+   * hidden columns are top-level while the user columns are embedded inside a struct
+   * typed column called "row". We cheat here because this way we don't need to change
+   * column resolution and everything will work seemlessly. We'll only need to generate
+   * a different schema path for the columns but that's fairly simple.
+   * The hidden columns can be retrieved via 'SELECT row__id.* FROM <table>' which is
+   * similar to Hive's 'SELECT row__id FROM <table>'.
+   */
+  private void addColumnsForFullAcidTable(List<FieldSchema> fieldSchemas)
+      throws TableLoadingException {
+    StructType row__id = new StructType();
+    row__id.addField(new StructField("operation", ScalarType.INT, ""));
+    row__id.addField(new StructField("originaltransaction", ScalarType.BIGINT, ""));
+    row__id.addField(new StructField("bucket", ScalarType.INT, ""));
+    row__id.addField(new StructField("rowid", ScalarType.BIGINT, ""));
+    row__id.addField(new StructField("currenttransaction", ScalarType.BIGINT, ""));
+    addColumn(new Column("row__id", row__id, "", colsByPos_.size()));
+    addColumnsFromFieldSchemas(fieldSchemas);
+  }
+
+  /**
    * Clear the partitions of an HdfsTable and the associated metadata.
    */
   private void resetPartitions() {
@@ -1314,7 +1338,11 @@ public class HdfsTable extends Table implements FeFsTable {
     // Add all columns to the table. Ordering is important: partition columns first,
     // then all other columns.
     addColumnsFromFieldSchemas(msTbl.getPartitionKeys());
-    addColumnsFromFieldSchemas(nonPartFieldSchemas_);
+    if (AcidUtils.isFullAcidTable(msTbl.getParameters())) {
+      addColumnsForFullAcidTable(nonPartFieldSchemas_);
+    } else {
+      addColumnsFromFieldSchemas(nonPartFieldSchemas_);
+    }
     isSchemaLoaded_ = true;
   }
 
@@ -1619,6 +1647,9 @@ public class HdfsTable extends Table implements FeFsTable {
       hdfsTable.setNetwork_addresses(hostIndex_.getList());
     }
     hdfsTable.setPartition_prefixes(partitionLocationCompressor_.getPrefixes());
+    if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
+      hdfsTable.setIs_full_acid(true);
+    }
     return hdfsTable;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 9e2f02f..0c6ac31 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -683,6 +683,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   @Override // FeTable
   public List<Column> getColumnsInHiveOrder() {
     List<Column> columns = Lists.newArrayList(getNonClusteringColumns());
+    if (getMetaStoreTable() != null &&
+        AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
+      // Remove synthetic "row__id" column.
+      Preconditions.checkState(columns.get(0).getName().equals("row__id"));
+      columns.remove(0);
+    }
     columns.addAll(getClusteringColumns());
     return Collections.unmodifiableList(columns);
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 0fc44ae..849aa52 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -247,7 +247,8 @@ class DirectMetaProvider implements MetaProvider {
    */
   private Map<String, PartitionMetadata> loadUnpartitionedPartition(
       TableMetaRefImpl table, List<PartitionRef> partitionRefs,
-      ListMap<TNetworkAddress> hostIndex) {
+      ListMap<TNetworkAddress> hostIndex) throws MetaException {
+    //TODO(IMPALA-9042): Remove "throws MetaException"
     Preconditions.checkArgument(partitionRefs.size() == 1,
         "Expected exactly one partition to load for unpartitioned table");
     PartitionRef ref = partitionRefs.get(0);
@@ -303,7 +304,9 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
-      String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex) {
+      String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex)
+        throws MetaException {
+    //TODO(IMPALA-9042): Remove "throws MetaException"
     Path partDir = new Path(msPartition.getSd().getLocation());
     // TODO(todd): The table property to disable recursive loading is not supported
     // by this code path. However, DirectMetaProvider is not yet a supported feature.
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 16d0501..63a7b81 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -299,6 +299,8 @@ public class CatalogOpExecutor {
   // Table default capabilities
   private static final String ACIDINSERTONLY_CAPABILITIES =
       "HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE";
+  private static final String FULLACID_CAPABILITIES =
+      "HIVEFULLACIDREAD";
   private static final String NONACID_CAPABILITIES = "EXTREAD,EXTWRITE";
 
   // The maximum number of partitions to update in one Hive Metastore RPC.
@@ -2634,8 +2636,11 @@ public class CatalogOpExecutor {
       // Set table default capabilities in HMS
       if (tbl.getParameters().containsKey(CAPABILITIES_KEY)) return;
       if (AcidUtils.isTransactionalTable(tbl.getParameters())) {
-        Preconditions.checkState(!AcidUtils.isFullAcidTable(tbl.getParameters()));
-        tbl.getParameters().put(CAPABILITIES_KEY, ACIDINSERTONLY_CAPABILITIES);
+        if (AcidUtils.isFullAcidTable(tbl.getParameters())) {
+          tbl.getParameters().put(CAPABILITIES_KEY, FULLACID_CAPABILITIES);
+        } else {
+          tbl.getParameters().put(CAPABILITIES_KEY, ACIDINSERTONLY_CAPABILITIES);
+        }
       } else {
         // Managed KUDU table has issues with extra table properties:
         // 1. The property is not stored. 2. The table cannot be found after created.
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 3b856ad..5ef8b1a 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TQueryOptions;
@@ -76,13 +77,20 @@ public class AcidUtils {
   // Regex pattern for files in delta directories. The pattern matches strings like
   // "delta_0000006_0000006/000000_0",
   // "delta_0000009_0000009_0000/0000/def.txt"
-  private static final Pattern DELTA_PATTERN = Pattern.compile(
+  private static final String DELTA_STR =
       "delta_" +
-       "(?<minWriteId>\\d+)_" +
-       "(?<maxWriteId>\\d+)" +
-       "(?:_(?<optionalStatementId>\\d+))?" +
-       // Optional path suffix.
-       "(?:/.*)?");
+      "(?<minWriteId>\\d+)_" +
+      "(?<maxWriteId>\\d+)" +
+      "(?:_(?<optionalStatementId>\\d+)|_v(?<visibilityTxnId>\\d+))?" +
+      // Optional path suffix.
+      "(?:/.*)?";
+
+  private static final Pattern DELTA_PATTERN = Pattern.compile(DELTA_STR);
+
+  // Regex pattern for files in delete delta directories. The pattern is similar to
+  // the 'DELTA_PATTERN', but starts with "delete_".
+  private static final Pattern DELETE_DELTA_PATTERN = Pattern.compile(
+    "delete_" + DELTA_STR);
 
   @VisibleForTesting
   static final long SENTINEL_BASE_WRITE_ID = Long.MIN_VALUE;
@@ -222,8 +230,7 @@ public class AcidUtils {
     }
   }
 
-  private static ParsedDelta parseDelta(String dirPath) {
-    Matcher deltaMatcher = DELTA_PATTERN.matcher(dirPath);
+  private static ParsedDelta matcherToParsedDelta(Matcher deltaMatcher) {
     if (!deltaMatcher.matches()) {
       return null;
     }
@@ -234,6 +241,14 @@ public class AcidUtils {
     return new ParsedDelta(minWriteId, maxWriteId, statementId);
   }
 
+  private static ParsedDelta parseDelta(String dirPath) {
+    return matcherToParsedDelta(DELTA_PATTERN.matcher(dirPath));
+  }
+
+  private static ParsedDelta parseDeleteDelta(String dirPath) {
+    return matcherToParsedDelta(DELETE_DELTA_PATTERN.matcher(dirPath));
+  }
+
   /**
    * Filters the files based on Acid state.
    * @param stats the FileStatuses obtained from recursively listing the directory
@@ -243,10 +258,12 @@ public class AcidUtils {
    * @param loadStats stats to add counts of skipped files to. May be null.
    * @return the FileStatuses that is a subset of passed in descriptors that
    *    must be used.
+   * @throws MetaException on ACID error. TODO: Remove throws clause once IMPALA-9042
+   * is resolved.
    */
   public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
       Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds,
-      @Nullable LoadStats loadStats) {
+      @Nullable LoadStats loadStats) throws MetaException {
     List<FileStatus> validStats = new ArrayList<>(stats);
 
     // First filter out any paths that are not considered valid write IDs.
@@ -289,9 +306,23 @@ public class AcidUtils {
         if (parsedDelta.minWriteId <= maxBaseWriteId) {
           it.remove();
           if (loadStats != null) loadStats.filesSupercededByNewerBase++;
+        } else if (parsedDelta.minWriteId != parsedDelta.maxWriteId) {
+          // TODO(IMPALA-9512): Validate rows in minor compacted deltas.
+          // We could read the non-compacted delta directories, but we'd need to check
+          // that all of them still exists. Let's throw an error on minor compacted tables
+          // for now since we want to read minor compacted deltas in the near future.
+          throw new MetaException("Table is minor compacted which is not supported " +
+              "by Impala. Run major compaction to resolve this.");
         }
         continue;
       }
+      ParsedDelta deleteDelta = parseDeleteDelta(relPath);
+      if (deleteDelta != null) {
+        if (deleteDelta.maxWriteId > maxBaseWriteId) {
+          throw new MetaException("Table has deleted rows. It's currently not " +
+              "supported by Impala. Run major compaction to resolve this.");
+        }
+      }
 
       // Not in a base or a delta directory. In that case, it's probably a post-upgrade
       // file.
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 8dd968f..7b2709a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -2036,9 +2036,10 @@ public class AnalyzeDDLTest extends FrontendTestBase {
 
     // Inferring primitive and complex types
     AnalyzesOk("create table if not exists newtbl_DNE like orc " +
-        "'/test-warehouse/alltypestiny_orc_def/year=2009/month=1/000000_0'");
+        "'/test-warehouse/alltypestiny_orc_def/year=2009/month=1/" +
+        "base_0000001/bucket_00000_0'");
     AnalyzesOk("create table if not exists newtbl_DNE like orc " +
-        "'/test-warehouse/complextypestbl_orc_def/nullable.orc'");
+        "'/test-warehouse/complextypestbl_orc_def/base_0000001/bucket_00000_0'");
 
     // check invalid paths
     AnalysisError("create table if not exists functional.zipcode_incomes like ORC " +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
index ddfd192..cc4e44d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java
@@ -536,67 +536,58 @@ public class AnalyzerTest extends FrontendTestBase {
   @Test
   public void TestAnalyzeTransactional() {
     Assume.assumeTrue(MetastoreShim.getMajorVersion() > 2);
-    String errorMsg =
-      "Table functional_orc_def.full_transactional_table not supported. Transactional (ACID)" +
-          " tables are only supported when they are configured as insert_only.";
+    String fullAcidErrorMsg = "%s not supported on full " +
+        "transactional (ACID) table: functional_orc_def.full_transactional_table";
+    String transactionalErrorMsg = "%s not supported on " +
+        "transactional (ACID) table: %s";
+    String insertOnlyTbl = "functional.insert_only_transactional_table";
+    String fullTxnTbl = "functional_orc_def.full_transactional_table";
 
-    String insertOnlyErrorMsg = "%s not supported on " +
-      "transactional (ACID) table: functional.insert_only_transactional_table";
-
-    AnalysisError(
-        "create table test as select * from functional_orc_def.full_transactional_table",
-        errorMsg);
+    AnalyzesOk(
+        "create table test as select * from functional_orc_def.full_transactional_table");
     AnalyzesOk(
         "create table test as select * from functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "create table test like functional_orc_def.full_transactional_table",
-        errorMsg);
+    AnalyzesOk("create table test like functional_orc_def.full_transactional_table");
     AnalyzesOk("create table test like functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "insert into test select * from functional_orc_def.full_transactional_table",
-        errorMsg);
+    AnalyzesOk(
+        "insert into functional.testtbl " +
+        "select 1,'test',* from functional_orc_def.full_transactional_table");
     AnalyzesOk("insert into functional.testtbl select *,'test',1 " +
             "from functional.insert_only_transactional_table");
 
     AnalyzesOk("insert into functional.insert_only_transactional_table select * " +
         "from functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "compute stats functional_orc_def.full_transactional_table",
-        errorMsg);
+
+    AnalyzesOk("compute stats functional_orc_def.full_transactional_table");
     AnalyzesOk("compute stats functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "select * from functional_orc_def.full_transactional_table",
-        errorMsg);
+    AnalyzesOk("select * from functional_orc_def.full_transactional_table");
     AnalyzesOk("select * from functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "drop table functional_orc_def.full_transactional_table",
-         errorMsg);
+    AnalyzesOk("drop table functional_orc_def.full_transactional_table");
     AnalyzesOk("drop table functional.insert_only_transactional_table");
 
-    AnalysisError(
-        "truncate table functional_orc_def.full_transactional_table",
-        errorMsg);
+    AnalysisError("truncate table functional_orc_def.full_transactional_table",
+        String.format(fullAcidErrorMsg, "TRUNCATE"));
     AnalyzesOk("truncate table functional.insert_only_transactional_table");
 
     AnalysisError(
         "alter table functional_orc_def.full_transactional_table " +
         "add columns (col2 string)",
-        errorMsg);
+        String.format(transactionalErrorMsg, "ALTER TABLE", fullTxnTbl));
     AnalysisError(
         "alter table functional.insert_only_transactional_table " +
-            "add columns (col2 string)",
-        String.format(insertOnlyErrorMsg, "ALTER TABLE"));
+        "add columns (col2 string)",
+        String.format(transactionalErrorMsg, "ALTER TABLE", insertOnlyTbl));
 
     AnalysisError(
         "drop stats functional_orc_def.full_transactional_table",
-        errorMsg);
+        String.format(transactionalErrorMsg, "DROP STATS", fullTxnTbl));
     AnalysisError("drop stats functional.insert_only_transactional_table",
-        String.format(insertOnlyErrorMsg, "DROP STATS"));
+        String.format(transactionalErrorMsg, "DROP STATS", insertOnlyTbl));
 
     AnalyzesOk("describe functional.insert_only_transactional_table");
     AnalyzesOk("describe functional_orc_def.full_transactional_table");
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index e0e7df0..19b3660 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.util.ListMap;
@@ -38,7 +39,8 @@ import com.google.common.collect.ImmutableList;
 public class FileMetadataLoaderTest {
 
   @Test
-  public void testRecursiveLoading() throws IOException {
+  public void testRecursiveLoading() throws IOException, MetaException {
+    //TODO(IMPALA-9042): Remove "throws MetaException"
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
@@ -75,7 +77,8 @@ public class FileMetadataLoaderTest {
   }
 
   @Test
-  public void testHudiParquetLoading() throws IOException {
+  public void testHudiParquetLoading() throws IOException, MetaException {
+    //TODO(IMPALA-9042): Remove "throws MetaException"
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/hudi_parquet/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
@@ -106,7 +109,8 @@ public class FileMetadataLoaderTest {
   }
 
   @Test
-  public void testLoadMissingDirectory() throws IOException {
+  public void testLoadMissingDirectory() throws IOException, MetaException {
+    //TODO(IMPALA-9042): Remove "throws MetaException"
     for (boolean recursive : ImmutableList.of(false, true)) {
       ListMap<TNetworkAddress> hostIndex = new ListMap<>();
       Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/does-not-exist/");
@@ -117,8 +121,9 @@ public class FileMetadataLoaderTest {
     }
   }
 
+  //TODO(IMPALA-9042): Remove 'throws MetaException'
   @Test
-  public void testSkipHiddenDirectories() throws IOException {
+  public void testSkipHiddenDirectories() throws IOException, MetaException {
     Path sourcePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
     Path tmpTestPath = new Path("hdfs://localhost:20500/tmp/test-filemetadata-loader");
     Configuration conf = new Configuration();
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index aa24878..07620b2 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -18,15 +18,20 @@ package org.apache.impala.util;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.compat.MetastoreShim;
 import org.hamcrest.Matchers;
 import org.junit.Assume;
@@ -62,9 +67,33 @@ public class AcidUtilsTest {
     List<FileStatus> stats = createMockStats(relPaths);
     List<FileStatus> expectedStats = createMockStats(expectedRelPaths);
 
-    assertThat(AcidUtils.filterFilesForAcidState(stats, BASE_PATH,
-        new ValidReadTxnList(validTxnListStr), writeIds, null),
-        Matchers.containsInAnyOrder(expectedStats.toArray()));
+    try {
+      assertThat(AcidUtils.filterFilesForAcidState(stats, BASE_PATH,
+          new ValidReadTxnList(validTxnListStr), writeIds, null),
+          Matchers.containsInAnyOrder(expectedStats.toArray()));
+    } catch (MetaException me) {
+      //TODO: Remove try-catch once IMPALA-9042 is resolved.
+      assertTrue(false);
+    }
+  }
+
+  public void filteringError(String[] relPaths, String validTxnListStr,
+      String validWriteIdListStr, String expectedErrorString) {
+    Preconditions.checkNotNull(expectedErrorString, "No expected error message given.");
+    try {
+      ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
+        validWriteIdListStr);
+      List<FileStatus> stats = createMockStats(relPaths);
+      AcidUtils.filterFilesForAcidState(
+          stats, BASE_PATH, new ValidReadTxnList(validTxnListStr), writeIds, null);
+    } catch (Exception e) {
+      String errorString = e.getMessage();
+      Preconditions.checkNotNull(errorString, "Stack trace lost during exception.");
+      String msg = "got error:\n" + errorString + "\nexpected:\n" + expectedErrorString;
+      assertTrue(msg, errorString.startsWith(expectedErrorString));
+      return;
+    }
+    fail("Filtering didn't result in error");
   }
 
   @Test
@@ -308,4 +337,93 @@ public class AcidUtilsTest {
             "delta_0000012_0000012_0000/0000_0",
             "delta_0000012_0000012_0000/0000_1"});
   }
+
+  @Test
+  public void testMinorCompactionFail() {
+    filteringError(new String[]{
+            "base_0000005/",
+            "base_0000005/abc.txt",
+            "delta_0000006_0000007/",
+            "delta_0000006_0000007/00000"},
+        // all txns are valid
+        "",
+        // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
+        "default.test:10:1234:1,2,3",
+        "Table is minor compacted");
+    filteringError(new String[]{
+          "base_0000005/",
+          "base_0000005/abc.txt",
+          "delta_0000006_0000007_00123/",
+          "delta_0000006_0000007_00123/00000"},
+        // all txns are valid
+        "",
+        // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
+        "default.test:10:1234:1,2,3",
+        "Table is minor compacted");
+    filteringError(new String[]{
+          "base_0000005/",
+          "base_0000005/abc.txt",
+          "delta_0000006_0000007_v00123/",
+          "delta_0000006_0000007_v00123/00000"},
+        // all txns are valid
+        "",
+        // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
+        "default.test:10:1234:1,2,3",
+        "Table is minor compacted");
+  }
+
+  @Test
+  public void testDeleteDeltaFail() {
+    filteringError(new String[]{
+            "base_0000005/",
+            "base_0000005/abc.txt",
+            "delete_delta_0000006_0000006/",
+            "delete_delta_0000006_0000006/00000"},
+        // all txns are valid
+        "",
+        // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
+        "default.test:10:1234:1,2,3",
+        "Table has deleted rows"
+        );
+  }
+
+  @Test
+  public void testMinorCompactionBeforeBase() {
+    assertFiltering(new String[]{
+            "delta_000005_000008_0000/",
+            "delta_000005_000008_0000/abc.txt",
+            "base_000010/",
+            "base_000010/0000_0",
+            "delta_0000012_0000012_0000/",
+            "delta_0000012_0000012_0000/0000_0",
+            "delta_0000012_0000012_0000/0000_1"},
+        // <table>:<highWaterMark>:<minOpenWriteId>
+        "default.test:20:15::",
+        new String[]{
+            // No minor compactions after base directory so it should succeed.
+            "base_000010/0000_0",
+            "delta_0000012_0000012_0000/0000_0",
+            "delta_0000012_0000012_0000/0000_1"});
+  }
+
+  @Test
+  public void testDeletesBeforeBase() {
+    assertFiltering(new String[]{
+            "delta_000004_000004_0000/",
+            "delta_000004_000004_0000/0000",
+            "delete_delta_000005_000005_0000/",
+            "delete_delta_000005_000005_0000/0000",
+            "base_000010/",
+            "base_000010/0000_0",
+            "delta_0000012_0000012_0000/",
+            "delta_0000012_0000012_0000/0000_0",
+            "delta_0000012_0000012_0000/0000_1"},
+        // <table>:<highWaterMark>:<minOpenWriteId>
+        "default.test:20:15::",
+        new String[]{
+            // No deletes after base directory so it should succeed.
+            "base_000010/0000_0",
+            "delta_0000012_0000012_0000/0000_0",
+            "delta_0000012_0000012_0000/0000_1"});
+  }
 }
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index c8a63ec..8113498 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -108,6 +108,7 @@ import sys
 import tempfile
 from itertools import product
 from optparse import OptionParser
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.util.test_file_parser import *
 from tests.common.test_dimensions import *
 
@@ -156,6 +157,8 @@ COMPRESSION_CODEC = "SET mapred.output.compression.codec=%s;"
 AVRO_COMPRESSION_CODEC = "SET avro.output.codec=%s;"
 SET_DYNAMIC_PARTITION_STATEMENT = "SET hive.exec.dynamic.partition=true;"
 SET_PARTITION_MODE_NONSTRICT_STATEMENT = "SET hive.exec.dynamic.partition.mode=nonstrict;"
+SET_MAX_DYNAMIC_PARTITIONS_STATEMENT = "SET hive.exec.max.dynamic.partitions=10000;\n"\
+    "SET hive.exec.max.dynamic.partitions.pernode=10000;"
 SET_HIVE_INPUT_FORMAT = "SET mapred.max.split.size=256000000;\n"\
                         "SET hive.input.format=org.apache.hadoop.hive.ql.io.%s;\n"
 SET_HIVE_HBASE_BULK_LOAD = "SET hive.hbase.bulk = true"
@@ -312,6 +315,12 @@ def build_table_template(file_format, columns, partition_columns, row_format,
     # Kudu's test tables are managed.
     external = ""
 
+  # ORC tables are full ACID by default.
+  if (HIVE_MAJOR_VERSION == 3 and
+      file_format == 'orc' and
+      'transactional' not in tblproperties):
+    external = ""
+    tblproperties['transactional'] = 'true'
 
   all_tblproperties = []
   for key, value in tblproperties.iteritems():
@@ -448,6 +457,7 @@ def build_insert_into_statement(insert, db_name, db_suffix, table_name, file_for
 
   statement = SET_PARTITION_MODE_NONSTRICT_STATEMENT + "\n"
   statement += SET_DYNAMIC_PARTITION_STATEMENT + "\n"
+  statement += SET_MAX_DYNAMIC_PARTITIONS_STATEMENT + "\n"
   statement += "set hive.auto.convert.join=true;\n"
 
   # For some reason (hive bug?) we need to have the CombineHiveInputFormat set
@@ -625,6 +635,10 @@ def generate_statements(output_name, test_vectors, sections,
       else:
         create_kudu = None
 
+      if file_format == 'orc' and section["DEPENDENT_LOAD_ACID"]:
+        insert = None
+        insert_hive = eval_section(section["DEPENDENT_LOAD_ACID"])
+
       columns = eval_section(section['COLUMNS']).strip()
       partition_columns = section['PARTITION_COLUMNS'].strip()
       row_format = section['ROW_FORMAT'].strip()
@@ -807,7 +821,7 @@ def parse_schema_template_file(file_name):
   VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 'PARTITION_COLUMNS',
                          'ROW_FORMAT', 'CREATE', 'CREATE_HIVE', 'CREATE_KUDU',
                          'DEPENDENT_LOAD', 'DEPENDENT_LOAD_KUDU', 'DEPENDENT_LOAD_HIVE',
-                         'LOAD', 'ALTER', 'HBASE_COLUMN_FAMILIES',
+                         'DEPENDENT_LOAD_ACID', 'LOAD', 'ALTER', 'HBASE_COLUMN_FAMILIES',
                          'TABLE_PROPERTIES', 'HBASE_REGION_SPLITS', 'HIVE_MAJOR_VERSION']
   return parse_test_file(file_name, VALID_SECTION_NAMES, skip_unknown_sections=False)
 
diff --git a/testdata/datasets/README b/testdata/datasets/README
index 75de1b4..0757c1a 100644
--- a/testdata/datasets/README
+++ b/testdata/datasets/README
@@ -72,6 +72,7 @@ The schema template SQL files have the following format:
   DEPENDENT_LOAD
   DEPENDENT_LOAD_KUDU
   DEPENDENT_LOAD_HIVE
+  DEPENDENT_LOAD_ACID
       Statements to be executed during the "dependent load" phase. These statements
       are run after the initial (base table) load is complete.
 
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index f502df7..6cf0b4b 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -699,12 +699,30 @@ nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: i
 hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nullable.parq \
 /test-warehouse/complextypestbl_parquet/ && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nonnullable.parq \
-/test-warehouse/complextypestbl_parquet/ && \
-hadoop fs -mkdir -p /test-warehouse/complextypestbl_orc_def && \
+/test-warehouse/complextypestbl_parquet/
+---- DEPENDENT_LOAD_ACID
+INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functional_parquet.complextypestbl;
+---- LOAD
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
+complextypestbl_non_transactional
+---- COLUMNS
+id bigint
+int_array array<int>
+int_array_array array<array<int>>
+int_map map<string, int>
+int_map_array array<map<string, int>>
+nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: int, f: string>>>>, g: map<string, struct<h: struct<i: array<double>>>>>
+---- TABLE_PROPERTIES
+transactional=false
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/complextypestbl_non_transactional_orc_def && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nullable.orc \
-/test-warehouse/complextypestbl_orc_def/ && \
+/test-warehouse/complextypestbl_non_transactional_orc_def/ && \
 hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nonnullable.orc \
-/test-warehouse/complextypestbl_orc_def/
+/test-warehouse/complextypestbl_non_transactional_orc_def/
 ---- LOAD
 ====
 ---- DATASET
@@ -1255,7 +1273,7 @@ date_string_col string
 string_col string
 timestamp_col timestamp
 ---- DEPENDENT_LOAD
-insert overwrite table {db_name}{db_suffix}.{table_name} SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col FROM {db_name}.{table_name} where id % 4 = 0;
+insert into table {db_name}{db_suffix}.{table_name} SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col FROM {db_name}.{table_name} where id % 4 = 0;
 insert into table {db_name}{db_suffix}.{table_name} SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col FROM {db_name}.{table_name} where id % 4 = 1;
 insert into table {db_name}{db_suffix}.{table_name} SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col FROM {db_name}.{table_name} where id % 4 = 2;
 insert into table {db_name}{db_suffix}.{table_name} SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col FROM {db_name}.{table_name} where id % 4 = 3;
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index b644a91..dba9d7a 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -80,6 +80,7 @@ table_name:complextypestbl, constraint:restrict_to, table_format:parquet/none/no
 table_name:complextypestbl, constraint:restrict_to, table_format:orc/def/block
 table_name:complextypestbl_medium, constraint:restrict_to, table_format:parquet/none/none
 table_name:complextypestbl_medium, constraint:restrict_to, table_format:orc/def/block
+table_name:complextypestbl_non_transactional, constraint:restrict_to, table_format:orc/def/block
 
 table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
 table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 3cd02c5..5ac65cf 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1465,22 +1465,22 @@ select * from tpch_orc_def.lineitem
 3
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
-Per-Host Resource Estimates: Memory=88MB
+Per-Host Resource Estimates: Memory=24MB
 Analyzed query: SELECT * FROM tpch_orc_def.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=24.00MB mem-reservation=8.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: tpch_orc_def.lineitem.l_orderkey, tpch_orc_def.lineitem.l_partkey, tpch_orc_def.lineitem.l_suppkey, tpch_orc_def.lineitem.l_linenumber, tpch_orc_def.lineitem.l_quantity, tpch_orc_def.lineitem.l_extendedprice, tpch_orc_def.lineitem.l_discount, tpch_orc_def.lineitem.l_tax, tpch_orc_def.lineitem.l_returnflag, tpch_orc_def.lineitem.l_linestatus, tpch_orc_def.lineitem.l_shipdate, tpch_orc_def.lineitem.l_commitdate, tpch_orc_def.lineitem.l_receiptdate, tpch_orc_def.lineitem.l_ [...]
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [tpch_orc_def.lineitem]
-   HDFS partitions=1/1 files=1 size=142.84MB
+   HDFS partitions=1/1 files=12 size=142.90MB
    stored statistics:
-     table: rows=6.00M size=142.84MB
+     table: rows=6.00M size=142.90MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=6.00M
-   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   extrapolated-rows=disabled max-scan-range-rows=538.49K
+   mem-estimate=24.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=231B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
@@ -1516,22 +1516,22 @@ select l_comment from tpch_orc_def.lineitem
 3
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
-Per-Host Resource Estimates: Memory=88MB
+Per-Host Resource Estimates: Memory=24MB
 Analyzed query: SELECT l_comment FROM tpch_orc_def.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=24.00MB mem-reservation=8.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: l_comment
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 00:SCAN HDFS [tpch_orc_def.lineitem]
-   HDFS partitions=1/1 files=1 size=142.84MB
+   HDFS partitions=1/1 files=12 size=142.90MB
    stored statistics:
-     table: rows=6.00M size=142.84MB
+     table: rows=6.00M size=142.90MB
      columns: all
-   extrapolated-rows=disabled max-scan-range-rows=6.00M
-   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   extrapolated-rows=disabled max-scan-range-rows=538.49K
+   mem-estimate=24.00MB mem-reservation=8.00MB thread-reservation=1
    tuple-ids=0 row-size=38B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test b/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
index 253253c..244898d 100644
--- a/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
+++ b/testdata/workloads/functional-query/queries/DataErrorsTest/orc-type-checks.test
@@ -95,7 +95,7 @@ select d2 from mismatch_decimals where d6 = 1
 decimal
 ---- RESULTS
 ---- CATCH
-Column decimal(10,0) in ORC file '$NAMENODE/test-warehouse/decimal_tbl_orc_def/d6=1/000000_0' can't be truncated to table column DECIMAL(8,0)
+can't be truncated to table column DECIMAL(8,0)
 ====
 ---- QUERY
 select d3 from mismatch_decimals
@@ -142,7 +142,8 @@ create external table $DATABASE.union_complextypes(
   u uniontype<int, boolean>,
   int_array array<int>,
   int_map map<string, int>)
-stored as orc;
+stored as orc
+tblproperties ('external.table.purge'='TRUE');
 insert into $DATABASE.union_complextypes
   select 0, create_union(1, 50, false), array(0), map('key0', 0);
 # Create an external table on the ORC files with different schema.
@@ -184,16 +185,23 @@ File '$NAMENODE/test-warehouse/$DATABASE.db/union_complextypes/000000_0'
  has an incompatible ORC schema for column '$DATABASE.ill_complextypes.int_array',
  Column type: array, ORC schema: map<string,int>
 ====
+---- HIVE_QUERY
+use $DATABASE;
+create external table orc_complextypes like functional_parquet.complextypestbl
+stored as orc
+tblproperties ('external.table.purge'='TRUE');
+insert into orc_complextypes select * from functional_parquet.complextypestbl;
+====
 ---- QUERY
+invalidate metadata orc_complextypes;
 # Create a table whose nested types don't match the underlying ORC files.
-# Reuse the data files of functional_orc_def.complextypestbl.
 create external table ill_complextypes2 (id bigint,
   int_array map<string, int>,
   int_array_array array<struct<x:int>>,
   int_map array<struct<k:string,v:int>>,
   int_map_array array<array<struct<k:string,v:int>>>
-) stored as orc location '$NAMENODE/test-warehouse/complextypestbl_orc_def';
-select a.item.x from ill_complextypes2.int_array_array a
+) stored as orc location '$NAMENODE/test-warehouse/$DATABASE.db/orc_complextypes';
+select a.item.x from ill_complextypes2.int_array_array a;
 ---- CATCH
 has an incompatible ORC schema for column '$DATABASE.ill_complextypes2.int_array_array.item',
  Column type: struct, ORC schema: array<int>
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
index cfd8416..8511da8 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -10,8 +10,100 @@ drop stats functional.insert_only_transactional_table;
 AnalysisException: DROP STATS not supported on transactional (ACID) table: functional.insert_only_transactional_table
 ====
 ---- QUERY
-select * from functional_orc_def.full_transactional_table;
+insert into functional_orc_def.full_transactional_table values (1);
 ---- CATCH
-AnalysisException: Table functional_orc_def.full_transactional_table not supported. Transactional (ACID) tables are only supported when they are configured as insert_only.
+AnalysisException: INSERT not supported on full transactional (ACID) table: functional_orc_def.full_transactional_table
+====
+---- QUERY
+truncate table functional_orc_def.full_transactional_table;
+---- CATCH
+AnalysisException: TRUNCATE not supported on full transactional (ACID) table: functional_orc_def.full_transactional_table
+====
+---- QUERY
+create table acid (i int) stored as orc tblproperties('transactional'='true');
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into acid values (1), (2), (3);
+delete from acid where i = 2;
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- CATCH
+TableLoadingException
+====
+---- HIVE_QUERY
+alter table $DATABASE.acid compact 'major' and wait;
+====
+---- QUERY
+invalidate metadata acid;
+select * from acid;
+---- RESULTS
+1
+3
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into acid values (5);
+insert into acid values (5);
+insert into acid values (5);
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- RESULTS
+1
+3
+5
+5
+5
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+alter table $DATABASE.acid compact 'minor' and wait;
+====
+---- QUERY
+# REFRESH fails because of minor compacted delete delta directories.
+refresh acid;
+---- CATCH
+TableLoadingException
+====
+---- QUERY
+# Until the old files are still there SELECT works well.
+select * from acid;
+---- RESULTS
+1
+3
+5
+5
+5
+====
+---- HIVE_QUERY
+alter table $DATABASE.acid compact 'major' and wait;
+====
+---- QUERY
+# We can't issue REFRESH here because REFRESH only works if the metadata for the table
+# is successfully loaded. The previous REFRESH broke table metadata. Once Impala-9042 is
+# resolved everything should work smoothly.
+invalidate metadata acid;
+show files in acid;
+---- RESULTS
+row_regex:'$NAMENODE/$MANAGED_WAREHOUSE_DIR/$DATABASE.db/acid/base_0000005_v\d+/bucket_\d+','\d+K?B',''
+---- TYPES
+STRING,STRING,STRING
+====
+---- QUERY
+select * from acid;
+---- RESULTS
+1
+3
+5
+5
+5
+---- TYPES
+INT
 ====
-
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
index b30763f..ecd48e2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -93,3 +93,21 @@ show tables;
 ---- RESULTS
 'upgraded_table'
 ====
+---- QUERY
+create table full_acid (i int) stored as orc
+tblproperties('transactional'='true');
+show tables;
+---- RESULTS
+'full_acid'
+'upgraded_table'
+====
+---- QUERY
+drop table full_acid;
+---- RESULTS
+'Table has been dropped.'
+====
+---- QUERY
+show tables;
+---- RESULTS
+'upgraded_table'
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-orc.test b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-orc.test
index 71901ca..5bbd7c1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-orc.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/create-table-like-file-orc.test
@@ -1,7 +1,14 @@
 ====
+---- HIVE_QUERY
+use $DATABASE;
+create external table copy_decimal_tiny
+stored as orc
+tblproperties ('external.table.purge'='TRUE')
+as select * from functional_orc_def.decimal_tiny;
+====
 ---- QUERY
 create table $DATABASE.temp_decimal_table_orc like ORC
-'$FILESYSTEM_PREFIX/test-warehouse/decimal_tiny_orc_def/000000_0'
+'$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/copy_decimal_tiny/000000_0'
 ---- RESULTS
 'Table has been created.'
 ====
@@ -16,22 +23,32 @@ STRING, STRING, STRING
 ====
 ---- QUERY
 create table $DATABASE.temp_chars_table like ORC
-'$FILESYSTEM_PREFIX/test-warehouse/chars_tiny_orc_def/000000_0'
+'$FILESYSTEM_PREFIX/test-warehouse/chars_tiny_orc_def/base_0000001/bucket_00000_0'
 ---- RESULTS
 'Table has been created.'
 ====
 ---- QUERY
 describe $DATABASE.temp_chars_table
 ---- RESULTS
-'cs','char(5)','Inferred from ORC file.'
-'cl','char(140)','Inferred from ORC file.'
-'vc','varchar(32)','Inferred from ORC file.'
+'operation','int','Inferred from ORC file.'
+'originaltransaction','bigint','Inferred from ORC file.'
+'rowid','bigint','Inferred from ORC file.'
+'bucket','int','Inferred from ORC file.'
+'currenttransaction','bigint','Inferred from ORC file.'
+'row','struct<\n  cs:char(5),\n  cl:char(140),\n  vc:varchar(32)\n>','Inferred from ORC file.'
 ---- TYPES
 STRING, STRING, STRING
 ====
+---- HIVE_QUERY
+use $DATABASE;
+create external table copy_zipcode_incomes
+stored as orc
+tblproperties ('external.table.purge'='TRUE')
+as select * from functional_orc_def.zipcode_incomes;
+====
 ---- QUERY
 create table $DATABASE.like_zipcodes_file_orc like ORC
-'$FILESYSTEM_PREFIX/test-warehouse/zipcode_incomes_orc_def/000000_0'
+'$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/copy_zipcode_incomes/000000_0'
 ---- RESULTS
 'Table has been created.'
 ====
@@ -46,9 +63,18 @@ describe $DATABASE.like_zipcodes_file_orc
 ---- TYPES
 STRING, STRING, STRING
 ====
+---- HIVE_QUERY
+use $DATABASE;
+create external table copy_alltypestiny
+stored as orc
+tblproperties ('external.table.purge'='TRUE')
+as select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
+          double_col, date_string_col, string_col, timestamp_col
+from functional_orc_def.alltypestiny;
+====
 ---- QUERY
 create table $DATABASE.like_alltypestiny_file_orc like ORC
-'$FILESYSTEM_PREFIX/test-warehouse/alltypestiny_orc_def/year=2009/month=1/000000_0'
+'$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/copy_alltypestiny/000000_0'
 ---- RESULTS
 'Table has been created.'
 ====
@@ -70,13 +96,13 @@ describe $DATABASE.like_alltypestiny_file_orc
 STRING, STRING, STRING
 ====
 ---- QUERY
-create table allcomplextypes_clone_orc like ORC
-'$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_orc_def/nullable.orc'
+create table non_transactional_complextypes_clone like ORC
+'$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_non_transactional_orc_def/nullable.orc'
 ---- RESULTS
 'Table has been created.'
 ====
 ---- QUERY
-describe allcomplextypes_clone_orc
+describe non_transactional_complextypes_clone
 ---- RESULTS
 'id','bigint','Inferred from ORC file.'
 'int_array','array<int>','Inferred from ORC file.'
@@ -87,3 +113,39 @@ describe allcomplextypes_clone_orc
 ---- TYPES
 STRING, STRING, STRING
 ====
+---- QUERY
+create external table transactional_complextypes_clone like ORC
+'$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_orc_def/base_0000001/bucket_00000_0'
+stored as orc
+location '$FILESYSTEM_PREFIX/test-warehouse/complextypestbl_orc_def/';
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+describe transactional_complextypes_clone
+---- RESULTS
+'operation','int','Inferred from ORC file.'
+'originaltransaction','bigint','Inferred from ORC file.'
+'bucket','int','Inferred from ORC file.'
+'rowid','bigint','Inferred from ORC file.'
+'currenttransaction','bigint','Inferred from ORC file.'
+'row','struct<\n  id:bigint,\n  int_array:array<int>,\n  int_array_array:array<array<int>>,\n  int_map:map<string,int>,\n  int_map_array:array<map<string,int>>,\n  nested_struct:struct<\n    a:int,\n    b:array<int>,\n    c:struct<\n      d:array<array<struct<\n        e:int,\n        f:string\n      >>>\n    >,\n    g:map<string,struct<\n      h:struct<\n        i:array<double>\n      >\n    >>\n  >\n>','Inferred from ORC file.'
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
+select originaltransaction, rowid, `row`.id from transactional_complextypes_clone;
+---- LABELS
+originaltransaction, rowid, row.id
+---- RESULTS
+1,0,8
+1,0,1
+1,1,2
+1,2,3
+1,3,4
+1,4,5
+1,5,6
+1,6,7
+---- TYPES
+BIGINT, BIGINT, BIGINT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/describe-path.test b/testdata/workloads/functional-query/queries/QueryTest/describe-path.test
index d6f041c..f56b0d3 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/describe-path.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/describe-path.test
@@ -136,3 +136,54 @@ describe allcomplextypes.int_array_col
 ---- TYPES
 string,string,string
 ====
+---- HIVE_MAJOR_VERSION
+3
+---- QUERY
+# describe full ACID partitioned table 'alltypes'.
+describe functional_orc_def.alltypes
+---- RESULTS
+'id','int','Add a comment'
+'bool_col','boolean',''
+'tinyint_col','tinyint',''
+'smallint_col','smallint',''
+'int_col','int',''
+'bigint_col','bigint',''
+'float_col','float',''
+'double_col','double',''
+'date_string_col','string',''
+'string_col','string',''
+'timestamp_col','timestamp',''
+'year','int',''
+'month','int',''
+---- TYPES
+string, string, string
+====
+---- HIVE_MAJOR_VERSION
+3
+---- QUERY
+# describe synthetic transactional field 'row__id'.
+describe functional_orc_def.alltypes.row__id
+---- RESULTS
+'operation','int',''
+'originaltransaction','bigint',''
+'bucket','int',''
+'rowid','bigint',''
+'currenttransaction','bigint',''
+---- TYPES
+string, string, string
+====
+---- HIVE_MAJOR_VERSION
+3
+---- QUERY
+# describe full ACID table with nested types.
+describe functional_orc_def.complextypestbl
+---- RESULTS
+'id','bigint',''
+'int_array','array<int>',''
+'int_array_array','array<array<int>>',''
+'int_map','map<string,int>',''
+'int_map_array','array<map<string,int>>',''
+'nested_struct','struct<\n  a:int,\n  b:array<int>,\n  c:struct<\n    d:array<array<struct<\n      e:int,\n      f:string\n    >>>\n  >,\n  g:map<string,struct<\n    h:struct<\n      i:array<double>\n    >\n  >>\n>',''
+---- TYPES
+string, string, string
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
new file mode 100644
index 0000000..2ddc06c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test
@@ -0,0 +1,137 @@
+====
+---- QUERY
+select * from functional_orc_def.alltypestiny;
+---- LABELS
+ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
+---- RESULTS
+4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+---- TYPES
+INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select row__id.*, * from functional_orc_def.alltypestiny;
+---- LABELS
+OPERATION, ORIGINALTRANSACTION, BUCKET, ROWID, CURRENTTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
+---- RESULTS
+0,1,536936448,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,1,536936448,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,1,537067520,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,1,537067520,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,1,536870912,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,1,536870912,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,1,537001984,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,1,537001984,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select row__id.operation, row__id.rowid, row__id.originaltransaction, *
+from functional_orc_def.alltypestiny;
+---- LABELS
+ROW__ID.OPERATION, ROW__ID.ROWID, ROW__ID.ORIGINALTRANSACTION, ID, BOOL_COL, TINYINT_COL, SMALLINT_COL, INT_COL, BIGINT_COL, FLOAT_COL, DOUBLE_COL, DATE_STRING_COL, STRING_COL, TIMESTAMP_COL, YEAR, MONTH
+---- RESULTS
+0,0,1,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+0,1,1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,0,1,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+0,1,1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,0,1,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+0,1,1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+0,0,1,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+0,1,1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+---- TYPES
+INT, BIGINT, BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select id from functional_orc_def.complextypestbl;
+---- LABELS
+ID
+---- RESULTS
+1
+2
+3
+4
+5
+6
+7
+8
+---- TYPES
+BIGINT
+====
+---- QUERY
+select row__id.*, id from functional_orc_def.complextypestbl;
+---- LABELS
+OPERATION, ORIGINALTRANSACTION, BUCKET, ROWID, CURRENTTRANSACTION, ID
+---- RESULTS
+0,1,536870912,0,1,1
+0,1,536870912,1,1,2
+0,1,536870912,2,1,3
+0,1,536870912,3,1,4
+0,1,536870912,4,1,5
+0,1,536870912,5,1,6
+0,1,536870912,6,1,7
+0,1,536936448,0,1,8
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, BIGINT
+====
+---- QUERY
+select row__id.currenttransaction, row__id.bucket, row__id.rowid, id
+from functional_orc_def.complextypestbl;
+---- LABELS
+ROW__ID.CURRENTTRANSACTION, ROW__ID.BUCKET, ROW__ID.ROWID, ID
+---- RESULTS
+1,536870912,0,1
+1,536870912,1,2
+1,536870912,2,3
+1,536870912,3,4
+1,536870912,4,5
+1,536870912,5,6
+1,536870912,6,7
+1,536936448,0,8
+---- TYPES
+BIGINT, INT, BIGINT, BIGINT
+====
+---- QUERY
+select row__id.*, item from functional_orc_def.complextypestbl c, c.int_array a;
+---- LABELS
+OPERATION, ORIGINALTRANSACTION, BUCKET, ROWID, CURRENTTRANSACTION, ITEM
+---- RESULTS
+0,1,536936448,0,1,-1
+0,1,536870912,0,1,1
+0,1,536870912,0,1,2
+0,1,536870912,0,1,3
+0,1,536870912,1,1,NULL
+0,1,536870912,1,1,1
+0,1,536870912,1,1,2
+0,1,536870912,1,1,NULL
+0,1,536870912,1,1,3
+0,1,536870912,1,1,NULL
+---- TYPES
+INT, BIGINT, INT, BIGINT, BIGINT, INT
+====
+---- QUERY
+select row__id.rowid, row__id.currenttransaction, row__id.operation,
+       row__id.bucket, row__id.originaltransaction, item
+from functional_orc_def.complextypestbl c, c.int_array a;
+---- LABELS
+ROW__ID.ROWID, ROW__ID.CURRENTTRANSACTION, ROW__ID.OPERATION, ROW__ID.BUCKET, ROW__ID.ORIGINALTRANSACTION, ITEM
+---- RESULTS
+0,1,0,536936448,1,-1
+0,1,0,536870912,1,1
+0,1,0,536870912,1,2
+0,1,0,536870912,1,3
+1,1,0,536870912,1,NULL
+1,1,0,536870912,1,1
+1,1,0,536870912,1,2
+1,1,0,536870912,1,NULL
+1,1,0,536870912,1,3
+1,1,0,536870912,1,NULL
+---- TYPES
+BIGINT, BIGINT, INT, INT, BIGINT, INT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 02451b3..4c71a80 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -398,356 +398,3 @@ show create view $UNIQUE_DB.masked_view;
 ---- RESULTS
 'CREATE VIEW $UNIQUE_DB.masked_view AS\nSELECT id FROM functional.alltypestiny'
 ====
----- QUERY
-# Test queries on complex types table.
-select id from functional_parquet.complextypestbl
----- RESULTS
-100
-200
-300
-400
-500
-600
-700
-800
----- TYPES
-BIGINT
-====
----- QUERY
-# Test queries on complex types table.
-select * from functional_parquet.complextypestbl
----- RESULTS
-100
-200
-300
-400
-500
-600
-700
-800
----- TYPES
-BIGINT
-====
----- QUERY
-# Test resolving nested column of the masked table.
-select id, nested_struct.a from functional_parquet.complextypestbl
----- RESULTS
-100,1
-200,NULL
-300,NULL
-400,NULL
-500,NULL
-600,NULL
-700,7
-800,-1
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test resolving nested columns in expending star expression.
-select id, nested_struct.* from functional_parquet.complextypestbl
----- RESULTS
-100,1
-200,NULL
-300,NULL
-400,NULL
-500,NULL
-600,NULL
-700,7
-800,-1
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test resolving nested column in function.
-select count(id), count(nested_struct.a) from functional_parquet.complextypestbl
----- RESULTS
-8,3
----- TYPES
-BIGINT,BIGINT
-====
----- QUERY
-# Test predicates on masked columns and nested columns.
-select id, nested_struct.a from functional_parquet.complextypestbl t
-where id = 100 and nested_struct.a = 1;
----- RESULTS
-100,1
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test resolving nested collection of the nested table. Should resolve 't.int_array'
-# correctly though 'complextypestbl' will be masked into a table masking view.
-select pos, item from functional_parquet.complextypestbl t, t.int_array
----- RESULTS
-0,-1
-0,1
-0,NULL
-1,1
-1,2
-2,2
-2,3
-3,NULL
-4,3
-5,NULL
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Regression test when 'complextypestbl' is not used as TableRef.
-select pos, item from functional_parquet.complextypestbl.int_array
----- RESULTS
-0,-1
-0,1
-0,NULL
-1,1
-1,2
-2,2
-2,3
-3,NULL
-4,3
-5,NULL
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test resolving nested columns and nested collections.
-select id, nested_struct.a, a.pos, a.item
-from functional_parquet.complextypestbl t, t.int_array a
----- RESULTS
-100,1,0,1
-100,1,1,2
-100,1,2,3
-200,NULL,0,NULL
-200,NULL,1,1
-200,NULL,2,2
-200,NULL,3,NULL
-200,NULL,4,3
-200,NULL,5,NULL
-800,-1,0,-1
----- TYPES
-BIGINT,INT,BIGINT,INT
-====
----- QUERY
-# Test different JOINs comparing to the above test.
-select id, nested_struct.a, a.pos, a.item
-from functional_parquet.complextypestbl t join t.int_array a
----- RESULTS
-100,1,0,1
-100,1,1,2
-100,1,2,3
-200,NULL,0,NULL
-200,NULL,1,1
-200,NULL,2,2
-200,NULL,3,NULL
-200,NULL,4,3
-200,NULL,5,NULL
-800,-1,0,-1
----- TYPES
-BIGINT,INT,BIGINT,INT
-====
----- QUERY
-# Test different JOINs.
-select id, nested_struct.a, a.pos, a.item
-from functional_parquet.complextypestbl t left join t.int_array a
----- RESULTS
-100,1,0,1
-100,1,1,2
-100,1,2,3
-200,NULL,0,NULL
-200,NULL,1,1
-200,NULL,2,2
-200,NULL,3,NULL
-200,NULL,4,3
-200,NULL,5,NULL
-300,NULL,NULL,NULL
-400,NULL,NULL,NULL
-500,NULL,NULL,NULL
-600,NULL,NULL,NULL
-700,7,NULL,NULL
-800,-1,0,-1
----- TYPES
-BIGINT,INT,BIGINT,INT
-====
----- QUERY
-# Test different JOINs.
-select id, nested_struct.a, a.pos, a.item
-from functional_parquet.complextypestbl t right join t.int_array a
----- RESULTS
-100,1,0,1
-100,1,1,2
-100,1,2,3
-200,NULL,0,NULL
-200,NULL,1,1
-200,NULL,2,2
-200,NULL,3,NULL
-200,NULL,4,3
-200,NULL,5,NULL
-800,-1,0,-1
----- TYPES
-BIGINT,INT,BIGINT,INT
-====
----- QUERY
-# Test different JOINs.
-select id, nested_struct.a, a.pos, a.item
-from functional_parquet.complextypestbl t full outer join t.int_array a
----- RESULTS
-100,1,0,1
-100,1,1,2
-100,1,2,3
-200,NULL,0,NULL
-200,NULL,1,1
-200,NULL,2,2
-200,NULL,3,NULL
-200,NULL,4,3
-200,NULL,5,NULL
-300,NULL,NULL,NULL
-400,NULL,NULL,NULL
-500,NULL,NULL,NULL
-600,NULL,NULL,NULL
-700,7,NULL,NULL
-800,-1,0,-1
----- TYPES
-BIGINT,INT,BIGINT,INT
-====
----- QUERY
-# Test function and predicates on nested columns of the masked table.
-select count(nested_struct.a) from functional_parquet.complextypestbl t, t.int_array a
-where id = 100 and nested_struct.a = 1
----- RESULTS
-3
----- TYPES
-BIGINT
-====
----- QUERY
-# Test on a deeper nested collection 'int_array_array'.
-select id, nested_struct.a, aa.item
-from functional_parquet.complextypestbl t, t.int_array_array.item aa
----- RESULTS
-100,1,1
-100,1,3
-100,1,2
-100,1,4
-200,NULL,NULL
-200,NULL,3
-200,NULL,1
-200,NULL,NULL
-200,NULL,2
-200,NULL,4
-200,NULL,NULL
-700,7,5
-700,7,6
-800,-1,-1
-800,-1,-2
----- TYPES
-BIGINT,INT,INT
-====
----- QUERY
-# Test on several nested collections.
-select id, nested_struct.a as field, a.item, aa.item
-from functional_parquet.complextypestbl t, t.int_array a, t.int_array_array.item aa
-where nested_struct.a = -1
----- RESULTS
-800,-1,-1,-1
-800,-1,-1,-2
----- TYPES
-BIGINT,INT,INT,INT
-====
----- QUERY
-# Test on map type.
-select id, key, value from functional_parquet.complextypestbl t, t.int_map
----- RESULTS
-100,'k1',1
-100,'k2',100
-200,'k1',2
-200,'k2',NULL
-700,'k1',NULL
-700,'k3',NULL
-800,'k1',-1
----- TYPES
-BIGINT,STRING,INT
-====
----- QUERY
-# Test on deep nested column 'nested_struct.b'.
-select id, item from functional_parquet.complextypestbl t, t.nested_struct.b
----- RESULTS
-100,1
-200,NULL
-700,2
-700,3
-700,NULL
-800,-1
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test on correlated CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
-select id, item from functional_parquet.complextypestbl t,
-(select item from t.int_array where item = 2
- union all
- select item from t.int_array where item != 2
- union all
- select item from t.int_array where item is null) v
----- RESULTS
-100,1
-100,2
-100,3
-200,1
-200,2
-200,3
-200,NULL
-200,NULL
-200,NULL
-800,-1
----- TYPES
-BIGINT,INT
-====
----- QUERY
-# Test on correlated CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
-select id, e, f from functional_parquet.complextypestbl t,
-(select e, f from t.nested_struct.c.d.item where e = 10
- union all
- select e, f from t.nested_struct.c.d.item where e != 10
- union all
- select e, f from t.nested_struct.c.d.item where e is null) v
----- RESULTS
-100,-10,'bbb'
-100,10,'aaa'
-100,11,'c'
-200,-10,'bbb'
-200,10,'aaa'
-200,11,'c'
-200,NULL,'NULL'
-200,NULL,'NULL'
-200,NULL,'NULL'
-200,NULL,'NULL'
-700,NULL,'NULL'
-800,-1,'nonnullable'
----- TYPES
-BIGINT,INT,STRING
-====
----- QUERY
-# Test on relative CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
-select id, int_array.item, a2.item, a3.item,
-nested_struct.a, b.item, d2.e, d2.f, d3.e, d3.f
-from functional_parquet.complextypestbl t,
-t.int_array,
-t.int_array_array a1, a1.item a2,
-t.int_array_array.item a3,
-t.nested_struct.b,
-t.nested_struct.c.d, d.item d2,
-t.nested_struct.c.d.item d3
-where a2.item = 1 and a3.item = 2 and d2.e = 10 and d3.e = -10
----- RESULTS
-100,1,1,2,1,1,10,'aaa',-10,'bbb'
-100,2,1,2,1,1,10,'aaa',-10,'bbb'
-100,3,1,2,1,1,10,'aaa',-10,'bbb'
-200,1,1,2,NULL,NULL,10,'aaa',-10,'bbb'
-200,2,1,2,NULL,NULL,10,'aaa',-10,'bbb'
-200,3,1,2,NULL,NULL,10,'aaa',-10,'bbb'
-200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
-200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
-200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
----- TYPES
-BIGINT,INT,INT,INT,INT,INT,INT,STRING,INT,STRING
-====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_complex_types.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_complex_types.test
new file mode 100644
index 0000000..667731c
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking_complex_types.test
@@ -0,0 +1,354 @@
+====
+---- QUERY
+# Test queries on complex types table.
+select id from complextypestbl
+---- RESULTS
+100
+200
+300
+400
+500
+600
+700
+800
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test queries on complex types table.
+select * from complextypestbl
+---- RESULTS
+100
+200
+300
+400
+500
+600
+700
+800
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test resolving nested column of the masked table.
+select id, nested_struct.a from complextypestbl
+---- RESULTS
+100,1
+200,NULL
+300,NULL
+400,NULL
+500,NULL
+600,NULL
+700,7
+800,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test resolving nested columns in expending star expression.
+select id, nested_struct.* from complextypestbl
+---- RESULTS
+100,1
+200,NULL
+300,NULL
+400,NULL
+500,NULL
+600,NULL
+700,7
+800,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test resolving nested column in function.
+select count(id), count(nested_struct.a) from complextypestbl
+---- RESULTS
+8,3
+---- TYPES
+BIGINT,BIGINT
+====
+---- QUERY
+# Test predicates on masked columns and nested columns.
+select id, nested_struct.a from complextypestbl t
+where id = 100 and nested_struct.a = 1;
+---- RESULTS
+100,1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test resolving nested collection of the nested table. Should resolve 't.int_array'
+# correctly though 'complextypestbl' will be masked into a table masking view.
+select pos, item from complextypestbl t, t.int_array
+---- RESULTS
+0,-1
+0,1
+0,NULL
+1,1
+1,2
+2,2
+2,3
+3,NULL
+4,3
+5,NULL
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Regression test when 'complextypestbl' is not used as TableRef.
+select pos, item from complextypestbl.int_array
+---- RESULTS
+0,-1
+0,1
+0,NULL
+1,1
+1,2
+2,2
+2,3
+3,NULL
+4,3
+5,NULL
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test resolving nested columns and nested collections.
+select id, nested_struct.a, a.pos, a.item
+from complextypestbl t, t.int_array a
+---- RESULTS
+100,1,0,1
+100,1,1,2
+100,1,2,3
+200,NULL,0,NULL
+200,NULL,1,1
+200,NULL,2,2
+200,NULL,3,NULL
+200,NULL,4,3
+200,NULL,5,NULL
+800,-1,0,-1
+---- TYPES
+BIGINT,INT,BIGINT,INT
+====
+---- QUERY
+# Test different JOINs comparing to the above test.
+select id, nested_struct.a, a.pos, a.item
+from complextypestbl t join t.int_array a
+---- RESULTS
+100,1,0,1
+100,1,1,2
+100,1,2,3
+200,NULL,0,NULL
+200,NULL,1,1
+200,NULL,2,2
+200,NULL,3,NULL
+200,NULL,4,3
+200,NULL,5,NULL
+800,-1,0,-1
+---- TYPES
+BIGINT,INT,BIGINT,INT
+====
+---- QUERY
+# Test different JOINs.
+select id, nested_struct.a, a.pos, a.item
+from complextypestbl t left join t.int_array a
+---- RESULTS
+100,1,0,1
+100,1,1,2
+100,1,2,3
+200,NULL,0,NULL
+200,NULL,1,1
+200,NULL,2,2
+200,NULL,3,NULL
+200,NULL,4,3
+200,NULL,5,NULL
+300,NULL,NULL,NULL
+400,NULL,NULL,NULL
+500,NULL,NULL,NULL
+600,NULL,NULL,NULL
+700,7,NULL,NULL
+800,-1,0,-1
+---- TYPES
+BIGINT,INT,BIGINT,INT
+====
+---- QUERY
+# Test different JOINs.
+select id, nested_struct.a, a.pos, a.item
+from complextypestbl t right join t.int_array a
+---- RESULTS
+100,1,0,1
+100,1,1,2
+100,1,2,3
+200,NULL,0,NULL
+200,NULL,1,1
+200,NULL,2,2
+200,NULL,3,NULL
+200,NULL,4,3
+200,NULL,5,NULL
+800,-1,0,-1
+---- TYPES
+BIGINT,INT,BIGINT,INT
+====
+---- QUERY
+# Test different JOINs.
+select id, nested_struct.a, a.pos, a.item
+from complextypestbl t full outer join t.int_array a
+---- RESULTS
+100,1,0,1
+100,1,1,2
+100,1,2,3
+200,NULL,0,NULL
+200,NULL,1,1
+200,NULL,2,2
+200,NULL,3,NULL
+200,NULL,4,3
+200,NULL,5,NULL
+300,NULL,NULL,NULL
+400,NULL,NULL,NULL
+500,NULL,NULL,NULL
+600,NULL,NULL,NULL
+700,7,NULL,NULL
+800,-1,0,-1
+---- TYPES
+BIGINT,INT,BIGINT,INT
+====
+---- QUERY
+# Test function and predicates on nested columns of the masked table.
+select count(nested_struct.a) from complextypestbl t, t.int_array a
+where id = 100 and nested_struct.a = 1
+---- RESULTS
+3
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Test on a deeper nested collection 'int_array_array'.
+select id, nested_struct.a, aa.item
+from complextypestbl t, t.int_array_array.item aa
+---- RESULTS
+100,1,1
+100,1,3
+100,1,2
+100,1,4
+200,NULL,NULL
+200,NULL,3
+200,NULL,1
+200,NULL,NULL
+200,NULL,2
+200,NULL,4
+200,NULL,NULL
+700,7,5
+700,7,6
+800,-1,-1
+800,-1,-2
+---- TYPES
+BIGINT,INT,INT
+====
+---- QUERY
+# Test on several nested collections.
+select id, nested_struct.a as field, a.item, aa.item
+from complextypestbl t, t.int_array a, t.int_array_array.item aa
+where nested_struct.a = -1
+---- RESULTS
+800,-1,-1,-1
+800,-1,-1,-2
+---- TYPES
+BIGINT,INT,INT,INT
+====
+---- QUERY
+# Test on map type.
+select id, key, value from complextypestbl t, t.int_map
+---- RESULTS
+100,'k1',1
+100,'k2',100
+200,'k1',2
+200,'k2',NULL
+700,'k1',NULL
+700,'k3',NULL
+800,'k1',-1
+---- TYPES
+BIGINT,STRING,INT
+====
+---- QUERY
+# Test on deep nested column 'nested_struct.b'.
+select id, item from complextypestbl t, t.nested_struct.b
+---- RESULTS
+100,1
+200,NULL
+700,2
+700,3
+700,NULL
+800,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test on correlated CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
+select id, item from complextypestbl t,
+(select item from t.int_array where item = 2
+ union all
+ select item from t.int_array where item != 2
+ union all
+ select item from t.int_array where item is null) v
+---- RESULTS
+100,1
+100,2
+100,3
+200,1
+200,2
+200,3
+200,NULL
+200,NULL
+200,NULL
+800,-1
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# Test on correlated CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
+select id, e, f from complextypestbl t,
+(select e, f from t.nested_struct.c.d.item where e = 10
+ union all
+ select e, f from t.nested_struct.c.d.item where e != 10
+ union all
+ select e, f from t.nested_struct.c.d.item where e is null) v
+---- RESULTS
+100,-10,'bbb'
+100,10,'aaa'
+100,11,'c'
+200,-10,'bbb'
+200,10,'aaa'
+200,11,'c'
+200,NULL,'NULL'
+200,NULL,'NULL'
+200,NULL,'NULL'
+200,NULL,'NULL'
+700,NULL,'NULL'
+800,-1,'nonnullable'
+---- TYPES
+BIGINT,INT,STRING
+====
+---- QUERY
+# Test on relative CollectionTableRefs. This query is copied from nested-types-scanner-multiple-materialization.test.
+select id, int_array.item, a2.item, a3.item,
+nested_struct.a, b.item, d2.e, d2.f, d3.e, d3.f
+from complextypestbl t,
+t.int_array,
+t.int_array_array a1, a1.item a2,
+t.int_array_array.item a3,
+t.nested_struct.b,
+t.nested_struct.c.d, d.item d2,
+t.nested_struct.c.d.item d3
+where a2.item = 1 and a3.item = 2 and d2.e = 10 and d3.e = -10
+---- RESULTS
+100,1,1,2,1,1,10,'aaa',-10,'bbb'
+100,2,1,2,1,1,10,'aaa',-10,'bbb'
+100,3,1,2,1,1,10,'aaa',-10,'bbb'
+200,1,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+200,2,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+200,3,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+200,NULL,1,2,NULL,NULL,10,'aaa',-10,'bbb'
+---- TYPES
+BIGINT,INT,INT,INT,INT,INT,INT,STRING,INT,STRING
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-create-table-full-acid.test b/testdata/workloads/functional-query/queries/QueryTest/show-create-table-full-acid.test
new file mode 100644
index 0000000..e16f702
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-create-table-full-acid.test
@@ -0,0 +1,98 @@
+====
+---- CREATE_TABLE
+# Create simple full ACID table
+CREATE TABLE test1 (
+  id INT
+)
+STORED AS ORC
+TBLPROPERTIES('transactional'='true')
+---- RESULTS-HIVE-3
+CREATE TABLE show_create_table_test_db.test1 (
+  id INT
+)
+STORED AS ORC
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+====
+---- CREATE_TABLE
+# simple table with all types
+CREATE TABLE test2 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+STORED AS ORC
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')
+---- RESULTS-HIVE-3
+CREATE TABLE show_create_table_test_db.test2 (
+  year INT,
+  month INT,
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+STORED AS ORC
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+====
+---- CREATE_TABLE
+# partitioned table with all types
+CREATE TABLE test_part (
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+PARTITIONED BY (
+  year INT,
+  month INT
+)
+STORED AS ORC
+TBLPROPERTIES('transactional'='true', 'transactional_properties'='default')
+---- RESULTS-HIVE-3
+CREATE TABLE show_create_table_test_db.test_part (
+  id INT COMMENT 'Add a comment',
+  bool_col BOOLEAN,
+  tinyint_col TINYINT,
+  smallint_col SMALLINT,
+  int_col INT,
+  bigint_col BIGINT,
+  float_col FLOAT,
+  double_col DOUBLE,
+  date_string_col STRING,
+  string_col STRING,
+  timestamp_col TIMESTAMP
+)
+PARTITIONED BY (
+  year INT,
+  month INT
+)
+STORED AS ORC
+LOCATION '$$location_uri$$'
+TBLPROPERTIES ('transactional'='true', 'transactional_properties'='default')
+====
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index f6f6ba6..3f9cddd 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -911,24 +911,29 @@ class TestRanger(CustomClusterTestSuite):
         unique_name + str(policy_cnt), user, "functional", "alltypes", "string_col",
         "CUSTOM", "concat({col}, 'ttt')")
       policy_cnt += 1
-      # Add a policy on a primitive column of a table which contains nested columns.
-      TestRanger._add_column_masking_policy(
-        unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
-        "id", "CUSTOM", "100 * {col}")
-      policy_cnt += 1
-      # Add policies on a nested column though they won't be recognized (same as Hive).
-      TestRanger._add_column_masking_policy(
-        unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
-        "nested_struct.a", "CUSTOM", "100 * {col}")
-      policy_cnt += 1
-      TestRanger._add_column_masking_policy(
-        unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
-        "int_array", "MASK_NULL")
-      policy_cnt += 1
       self.execute_query_expect_success(admin_client, "refresh authorization",
                                         user=ADMIN)
       self.run_test_case("QueryTest/ranger_column_masking", vector,
                          test_file_vars={'$UNIQUE_DB': unique_database})
+      # Add a policy on a primitive column of a table which contains nested columns.
+      for db in ['functional_parquet', 'functional_orc_def']:
+        TestRanger._add_column_masking_policy(
+          unique_name + str(policy_cnt), user, db, "complextypestbl",
+          "id", "CUSTOM", "100 * {col}")
+        policy_cnt += 1
+        # Add policies on a nested column though they won't be recognized (same as Hive).
+        TestRanger._add_column_masking_policy(
+          unique_name + str(policy_cnt), user, db, "complextypestbl",
+          "nested_struct.a", "CUSTOM", "100 * {col}")
+        policy_cnt += 1
+        TestRanger._add_column_masking_policy(
+          unique_name + str(policy_cnt), user, db, "complextypestbl",
+          "int_array", "MASK_NULL")
+        policy_cnt += 1
+        self.execute_query_expect_success(admin_client, "refresh authorization",
+                                          user=ADMIN)
+        self.run_test_case("QueryTest/ranger_column_masking_complex_types", vector,
+                           use_db=db)
     finally:
       admin_client.execute("revoke create on database %s from user %s"
                            % (unique_database, user))
diff --git a/tests/common/skip.py b/tests/common/skip.py
index a24884b..7d91310 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -222,6 +222,8 @@ class SkipIfHive3:
   without_hms_not_supported = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
       reason="Instantiating HMS server in embedded mode within Hive client requires more "
              "dependencies of Hive 3, see IMPALA-9287.")
+  non_acid = pytest.mark.skipif(HIVE_MAJOR_VERSION >= 3,
+      reason="This test expects tables in non-AICD format.")
 
 
 class SkipIfHive2:
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 25f7032..1df135e 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -27,7 +27,7 @@ from tests.common.environ import (HIVE_MAJOR_VERSION)
 from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import (SkipIf, SkipIfABFS, SkipIfADLS, SkipIfKudu, SkipIfLocal,
-                               SkipIfCatalogV2, SkipIfHive2)
+                               SkipIfCatalogV2, SkipIfHive2, SkipIfS3)
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.util.filesystem_utils import (
     WAREHOUSE,
@@ -295,6 +295,7 @@ class TestDdlStatements(TestDdlBase):
         use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
 
   @SkipIfHive2.orc
+  @SkipIfS3.hive
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_table_like_file_orc(self, vector, unique_database):
     vector.get_value('exec_option')['abort_on_error'] = False
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index dbcc852..7774842 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -20,7 +20,7 @@ import re
 import shlex
 
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf, SkipIfHive3
+from tests.common.skip import SkipIf, SkipIfHive2
 from tests.common.test_dimensions import create_uncompressed_text_dimension
 from tests.util.test_file_parser import QueryTestSectionReader, remove_comments
 from tests.common.environ import HIVE_MAJOR_VERSION
@@ -61,6 +61,12 @@ class TestShowCreateTable(ImpalaTestSuite):
     self.__run_show_create_table_test_case('QueryTest/show-create-table', vector,
                                            unique_database)
 
+  @SkipIfHive2.acid
+  def test_show_create_table_full_acid(self, vector, unique_database):
+    self.__run_show_create_table_test_case('QueryTest/show-create-table-full-acid',
+                                           vector,
+                                           unique_database)
+
   def __run_show_create_table_test_case(self, test_file_name, vector, unique_db_name):
     """
     Runs a show-create-table test file, containing the following sections:
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index 19df4e5..2d0ceb9 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -104,6 +104,15 @@ class TestAcid(ImpalaTestSuite):
   @SkipIfADLS.hive
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
+  def test_full_acid_rowid(self, vector, unique_database):
+    self.run_test_case('QueryTest/full-acid-rowid', vector, use_db=unique_database)
+
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
   def test_acid_insert_statschg(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-clear-statsaccurate',
         vector, use_db=unique_database)
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 5908a8d..f1e5d32 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -21,6 +21,7 @@ import pytest
 
 from copy import deepcopy
 from tests.common.environ import ImpalaTestClusterProperties, build_flavor_timeout
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.kudu_test_suite import KuduTestSuite
 from tests.common.skip import SkipIfABFS, SkipIfEC, SkipIfNotHdfsMinicluster
@@ -66,6 +67,14 @@ class TestMtDop(ImpalaTestSuite):
       self.execute_query(
         "create external table %s like functional_hbase.alltypes" % fq_table_name)
       expected_results = "Updated 1 partition(s) and 13 column(s)."
+    elif HIVE_MAJOR_VERSION == 3 and file_format == 'orc':
+      self.run_stmt_in_hive(
+          "create table %s like functional_orc_def.alltypes" % fq_table_name)
+      self.run_stmt_in_hive(
+          "insert into %s select * from functional_orc_def.alltypes" % fq_table_name)
+      self.execute_query_using_client(self.client,
+          "invalidate metadata %s" % fq_table_name, vector)
+      expected_results = "Updated 24 partition(s) and 11 column(s)."
     else:
       # Create a second table in the same format pointing to the same data files.
       # This function switches to the format-specific DB in 'vector'.
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index f03dfbf..dba4dee 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -29,6 +29,7 @@ from tests.common.skip import (
     SkipIfABFS,
     SkipIfADLS,
     SkipIfEC,
+    SkipIfHive2,
     SkipIfLocal,
     SkipIfNotHdfsMinicluster
     )
@@ -207,13 +208,53 @@ class TestNestedTypesNoMtDop(ImpalaTestSuite):
         )
         STORED AS {1}""".format(db_table, file_format))
     # Add multiple partitions pointing to the complextypes_tbl data.
+    if file_format == 'parquet':
+      base_table = "functional%s.complextypestbl" % db_suffix
+    else:
+      assert file_format == 'orc'
+      base_table = "functional%s.complextypestbl_non_transactional" % db_suffix
     for partition in [1, 2]:
       self.client.execute("ALTER TABLE {0} ADD PARTITION(part={1}) LOCATION '{2}'".format(
           db_table, partition,
-          self._get_table_location("functional%s.complextypestbl" % db_suffix, vector)))
+          self._get_table_location(base_table, vector)))
     self.run_test_case('QueryTest/nested-types-basic-partitioned', vector,
         unique_database)
 
+  @SkipIfHive2.acid
+  def test_partitioned_table_acid(self, vector, unique_database):
+    """IMPALA-6370: Test that a partitioned table with nested types can be scanned."""
+    table = "complextypes_partitioned"
+    db_table = "{0}.{1}".format(unique_database, table)
+    table_format_info = vector.get_value('table_format')  # type: TableFormatInfo
+    file_format = table_format_info.file_format
+    if file_format != "orc":
+      pytest.skip('Full ACID tables are only supported in ORC format.')
+
+    self.client.execute("""
+        CREATE TABLE {0} (
+          id BIGINT,
+          int_array ARRAY<INT>,
+          int_array_array ARRAY<ARRAY<INT>>,
+          int_map MAP<STRING,INT>,
+          int_map_array ARRAY<MAP<STRING,INT>>,
+          nested_struct STRUCT<
+              a:INT,
+              b:ARRAY<INT>,
+              c:STRUCT<d:ARRAY<ARRAY<STRUCT<e:INT,f:STRING>>>>,
+              g:MAP<STRING,STRUCT<h:STRUCT<i:ARRAY<DOUBLE>>>>>
+        )
+        PARTITIONED BY (
+          part int
+        )
+        STORED AS ORC
+        TBLPROPERTIES('transactional'='true')""".format(db_table))
+    # Add multiple partitions with the complextypestbl data.
+    base_table = "functional_orc_def.complextypestbl"
+    for partition in [1, 2]:
+      self.run_stmt_in_hive("INSERT INTO TABLE {0} PARTITION(part={1}) "
+          "SELECT * FROM {2}".format(db_table, partition, base_table))
+    self.run_test_case('QueryTest/nested-types-basic-partitioned', vector,
+        unique_database)
 
 class TestParquetArrayEncodings(ImpalaTestSuite):
   TESTFILE_DIR = os.path.join(os.environ['IMPALA_HOME'],
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 4a2cf66..9287071 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -39,6 +39,8 @@ from tests.common.skip import (
     SkipIfABFS,
     SkipIfADLS,
     SkipIfEC,
+    SkipIfHive2,
+    SkipIfHive3,
     SkipIfIsilon,
     SkipIfLocal,
     SkipIfNotHdfsMinicluster)
@@ -199,13 +201,25 @@ class TestUnmatchedSchema(ImpalaTestSuite):
     Cannot be done in a setup method because we need access to the current test vector
     """
     self._drop_test_table(vector)
-    self.execute_query_using_client(self.client,
-        "create external table jointbl_test like jointbl", vector)
+    file_format = vector.get_value('table_format').file_format
+    if file_format == 'orc':
+      db_name = "functional" + vector.get_value('table_format').db_suffix()
+      self.run_stmt_in_hive(
+          "create table %s.jointbl_test like functional.jointbl "
+          "stored as orc" % db_name)
+      self.run_stmt_in_hive(
+          'insert into functional_orc_def.jointbl_test '
+          'select * from functional_orc_def.jointbl')
+      self.execute_query_using_client(self.client, 'invalidate metadata jointbl_test',
+          vector)
+    else:
+      self.execute_query_using_client(self.client,
+          "create external table jointbl_test like jointbl", vector)
 
-    # Update the location of the new table to point the same location as the old table
-    location = self._get_table_location('jointbl', vector)
-    self.execute_query_using_client(self.client,
-        "alter table jointbl_test set location '%s'" % location, vector)
+      # Update the location of the new table to point the same location as the old table
+      location = self._get_table_location('jointbl', vector)
+      self.execute_query_using_client(self.client,
+          "alter table jointbl_test set location '%s'" % location, vector)
 
   def _drop_test_table(self, vector):
     self.execute_query_using_client(self.client,
@@ -1289,6 +1303,7 @@ class TestOrc(ImpalaTestSuite):
     # lineitem_sixblocks.orc occupies 6 blocks.
     check_call(['hdfs', 'dfs', '-Ddfs.block.size=156672', '-copyFromLocal', '-d', '-f',
         os.environ['IMPALA_HOME'] + "/testdata/LineItemMultiBlock/" + file, tbl_loc])
+    self.client.execute("refresh %s.%s" % (db, tbl))
 
   def _misaligned_orc_stripes_helper(
           self, table_name, rows_in_table, num_scanners_with_no_reads=0):
@@ -1323,7 +1338,8 @@ class TestOrc(ImpalaTestSuite):
   @SkipIfIsilon.hive
   @SkipIfLocal.hive
   @SkipIfS3.hive
-  def test_type_conversions(self, vector, unique_database):
+  @SkipIfHive3.non_acid
+  def test_type_conversions_hive2(self, vector, unique_database):
     # Create "illtypes" tables whose columns can't match the underlining ORC file's.
     # Create an "safetypes" table likes above but ORC columns can still fit into it.
     # Reuse the data files of alltypestiny and date_tbl in funtional_orc_def.
@@ -1362,6 +1378,67 @@ class TestOrc(ImpalaTestSuite):
 
     self.run_test_case('DataErrorsTest/orc-type-checks', vector, unique_database)
 
+  # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive
+  # queries that hang in some cases (IMPALA-9345). It would be possible to separate
+  # the tests that use Hive and run most tests on S3, but I think that running these on
+  # S3 doesn't add too much coverage.
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  @SkipIfS3.hive
+  @SkipIfHive2.acid
+  def test_type_conversions_hive3(self, vector, unique_database):
+    # Create "illtypes" tables whose columns can't match the underlining ORC file's.
+    # Create an "safetypes" table likes above but ORC columns can still fit into it.
+    # Reuse the data files of alltypestiny and date_tbl in funtional_orc_def.
+    def create_plain_orc_table(fq_tbl_src, fq_tbl_dest):
+      self.run_stmt_in_hive(
+          "create table %s like %s stored as orc" % (fq_tbl_dest, fq_tbl_src))
+      self.run_stmt_in_hive("insert into %s select * from %s" % (fq_tbl_dest, fq_tbl_src))
+      self.client.execute("invalidate metadata %s" % fq_tbl_dest)
+    tmp_alltypes = unique_database + ".alltypes"
+    create_plain_orc_table("functional.alltypestiny", tmp_alltypes)
+    tbl_loc = self._get_table_location(tmp_alltypes, vector)
+    self.client.execute("""create table %s.illtypes (c1 boolean, c2 float,
+        c3 boolean, c4 tinyint, c5 smallint, c6 int, c7 boolean, c8 string, c9 int,
+        c10 float, c11 bigint) partitioned by (year int, month int) stored as ORC
+        location '%s'""" % (unique_database, tbl_loc))
+    self.client.execute("""create table %s.illtypes_ts_to_date (c1 boolean,
+        c2 float, c3 boolean, c4 tinyint, c5 smallint, c6 int, c7 boolean, c8 string,
+        c9 int, c10 float, c11 date) partitioned by (year int, month int) stored as ORC
+        location '%s'""" % (unique_database, tbl_loc))
+    self.client.execute("""create table %s.safetypes (c1 bigint, c2 boolean,
+        c3 smallint, c4 int, c5 bigint, c6 bigint, c7 double, c8 double, c9 char(3),
+        c10 varchar(3), c11 timestamp) partitioned by (year int, month int) stored as ORC
+        location '%s'""" % (unique_database, tbl_loc))
+    tmp_date_tbl = unique_database + ".date_tbl"
+    create_plain_orc_table("functional.date_tbl", tmp_date_tbl)
+    date_tbl_loc = self._get_table_location(tmp_date_tbl, vector)
+    self.client.execute("""create table %s.illtypes_date_tbl (c1 boolean,
+        c2 timestamp) partitioned by (date_part date) stored as ORC location '%s'"""
+        % (unique_database, date_tbl_loc))
+    self.client.execute("alter table %s.illtypes recover partitions" % unique_database)
+    self.client.execute("alter table %s.illtypes_ts_to_date recover partitions"
+        % unique_database)
+    self.client.execute("alter table %s.safetypes recover partitions" % unique_database)
+    self.client.execute("alter table %s.illtypes_date_tbl recover partitions"
+        % unique_database)
+
+    # Create a decimal table whose precisions don't match the underlining orc files.
+    # Reuse the data files of functional_orc_def.decimal_tbl.
+    tmp_decimal_tbl = unique_database + ".decimal_tbl"
+    create_plain_orc_table("functional.decimal_tbl", tmp_decimal_tbl)
+    decimal_loc = self._get_table_location(tmp_decimal_tbl, vector)
+    self.client.execute("""create table %s.mismatch_decimals (d1 decimal(8,0),
+        d2 decimal(8,0), d3 decimal(19,10), d4 decimal(20,20), d5 decimal(2,0))
+        partitioned by (d6 decimal(9,0)) stored as orc location '%s'"""
+        % (unique_database, decimal_loc))
+    self.client.execute("alter table %s.mismatch_decimals recover partitions"
+        % unique_database)
+
+    self.run_test_case('DataErrorsTest/orc-type-checks', vector, unique_database)
+
   def test_orc_timestamp_out_of_range(self, vector, unique_database):
       """Test the validation of out-of-range timestamps."""
       test_files = ["testdata/data/out_of_range_timestamp.orc"]
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 5a52828..a4b16c1 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -25,6 +25,7 @@ import shutil
 import tempfile
 import time
 from subprocess import check_call
+from tests.common.environ import HIVE_MAJOR_VERSION
 from tests.common.test_dimensions import create_exec_option_dimension_from_dict
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
@@ -173,28 +174,42 @@ class TestScannersFuzzing(ImpalaTestSuite):
     tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % fuzz_table,
         dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
 
-    self.execute_query("create table %s.%s like %s.%s" % (fuzz_db, fuzz_table,
-        src_db, src_table))
-    fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
-        fuzz_db, fuzz_table))
-
-    LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
-        fuzz_table, fuzz_db, tmp_table_dir)
-
-    # Find the location of the existing table and get the full table directory structure.
-    fq_table_name = src_db + "." + src_table
-    table_loc = self._get_table_location(fq_table_name, vector)
-    check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
-
-    partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
-    for partition in partitions:
-      self.execute_query('alter table {0}.{1} add partition ({2})'.format(
-          fuzz_db, fuzz_table, ','.join(partition)))
-
-    # Copy all of the local files and directories to hdfs.
-    to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
-               for file_or_dir in os.listdir(tmp_table_dir)]
-    self.filesystem_client.copy_from_local(to_copy, fuzz_table_location)
+    table_format = vector.get_value('table_format')
+    if HIVE_MAJOR_VERSION == 3 and table_format.file_format == 'orc':
+      self.run_stmt_in_hive("create table %s.%s like %s.%s" % (fuzz_db, fuzz_table,
+          src_db, src_table))
+      self.run_stmt_in_hive("insert into %s.%s select * from %s.%s" % (fuzz_db,
+          fuzz_table, src_db, src_table))
+      self.execute_query("invalidate metadata %s.%s" % (fuzz_db, fuzz_table))
+      fq_fuzz_table_name = fuzz_db + "." + fuzz_table
+      table_loc = self._get_table_location(fq_fuzz_table_name, vector)
+      check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
+      partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
+      self.path_aware_copy_files_to_hdfs(tmp_table_dir, table_loc)
+    else:
+      self.execute_query("create table %s.%s like %s.%s" % (fuzz_db, fuzz_table,
+          src_db, src_table))
+      fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
+          fuzz_db, fuzz_table))
+
+      LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
+          fuzz_table, fuzz_db, tmp_table_dir)
+
+      # Find the location of the existing table and get the full table directory
+      # structure.
+      fq_table_name = src_db + "." + src_table
+      table_loc = self._get_table_location(fq_table_name, vector)
+      check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
+
+      partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
+      for partition in partitions:
+        self.execute_query('alter table {0}.{1} add partition ({2})'.format(
+            fuzz_db, fuzz_table, ','.join(partition)))
+
+      # Copy all of the local files and directories to hdfs.
+      to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
+                for file_or_dir in os.listdir(tmp_table_dir)]
+      self.filesystem_client.copy_from_local(to_copy, fuzz_table_location)
 
     if "SCANNER_FUZZ_KEEP_FILES" not in os.environ:
       shutil.rmtree(tmp_table_dir)
@@ -259,6 +274,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
         filepath = os.path.join(subdir, filename)
         copies = [filepath]
         for copy_num in range(1, num_copies):
+          if filename == '_orc_acid_version': break
           copypath = os.path.join(subdir, "copy{0}_{1}".format(copy_num, filename))
           shutil.copyfile(filepath, copypath)
           copies.append(copypath)
@@ -266,13 +282,29 @@ class TestScannersFuzzing(ImpalaTestSuite):
           self.corrupt_file(filepath, rng)
     return partitions
 
+  def path_aware_copy_files_to_hdfs(self, local_dir, hdfs_dir):
+    for subdir, dirs, files in os.walk(local_dir):
+      if '_impala_insert_staging' in subdir: continue
+      if len(dirs) != 0: continue  # Skip non-leaf directories
+
+      rel_subdir = os.path.relpath(subdir, local_dir)
+      hdfs_location = hdfs_dir + '/' + rel_subdir
+
+      for filename in files:
+        self.filesystem_client.copy_from_local(os.path.join(subdir, filename),
+            hdfs_location)
+
   def partitions_from_path(self, relpath):
     """ Return a list of "key=val" parts from partitions inferred from the directory path.
     """
     reversed_partitions = []
     while relpath != '':
       relpath, suffix  = os.path.split(relpath)
-      reversed_partitions.append(suffix)
+      if (relpath == '' or
+          not suffix.startswith('base_') and
+          not suffix.startswith('delta_') and
+          not suffix.startswith('delete_delta_')):
+        reversed_partitions.append(suffix)
     return reversed(reversed_partitions)
 
   def corrupt_file(self, path, rng):