You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2019/05/24 07:43:03 UTC

[impala] 02/02: acid: Filter unwanted files based on ACID state.

This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9ee4a5e1940afa47227a92e0f6fba6d4c9909f63
Author: arorasudhanshu <su...@cloudera.com>
AuthorDate: Tue Apr 23 15:09:48 2019 -0700

    acid: Filter unwanted files based on ACID state.
    
    - Added new functionality in AcidUtils to filter out files in
      uncommitted directories, and to find the latest valid base data and
      filter out files corresponding to older deltas or bases.
    
    - Changed Table loading to only load writeIds for transactional tables,
      and enabled a previously-ignored unit test.
    
    - Modified Hive configuration to enable support for compactions:
    -- Need to pass Tez on the HMS classpath, since HMS actually schedules
       compactions rather than HS2.
    -- Had to configure a worker thread for the compactor, or else
       compactions wouldn't proceed even when manually triggered.
    
    Testing:
    - New unit tests (AcidUtilsTest) for filtering logic.
    - New e2e test to read data written by Hive in an insert-only table,
      with INSERT, INSERT OVERWRITE, and compaction. Also tests negative
      cases e2e.
    
    To enable the e2e test, this adds support for a 'HIVE_QUERY' section to
    the test script files. To make it reasonably fast, this uses Thrift to
    connect to HS2 rather than shelling out to beeline. In order for this to
    work properly, a bit of extra special-casing had to be added to the test
    utility.
    
    This commit was co-authored by Sudhanshu Arora and Todd Lipcon.
    
    Change-Id: Icf0aeb36e10c827ead59ed7f67e731199394fe8e
    Reviewed-on: http://gerrit.cloudera.org:8080/13334
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/pom.xml                                         |   7 +
 .../hadoop/hive/common/ValidWriteIdList.java       |   2 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |  91 ++++++----
 .../java/org/apache/impala/catalog/HdfsTable.java  |   7 +-
 .../main/java/org/apache/impala/catalog/Table.java |  11 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   5 +-
 .../org/apache/impala/common/FileSystemUtil.java   |  56 ++++---
 .../java/org/apache/impala/util/AcidUtils.java     | 167 +++++++++++++++++++
 .../impala/analysis/StmtMetadataLoaderTest.java    |  13 +-
 .../impala/catalog/FileMetadataLoaderTest.java     |  11 +-
 .../apache/impala/catalog/HdfsPartitionTest.java   |   2 +-
 .../java/org/apache/impala/util/AcidUtilsTest.java | 182 ++++++++++++++++++++
 fe/src/test/resources/hive-site.xml.py             |   4 +
 testdata/bin/run-hive-server.sh                    |  34 ++--
 .../queries/QueryTest/acid-compaction.test         |  39 +++++
 .../queries/QueryTest/acid-negative.test           |  22 +++
 .../functional-query/queries/QueryTest/acid.test   |  38 +++++
 tests/common/impala_connection.py                  |  41 +++--
 tests/common/impala_test_suite.py                  | 184 ++++++++++++---------
 tests/common/skip.py                               |   5 +
 tests/query_test/test_acid.py                      |  54 ++++++
 tests/util/test_file_parser.py                     |   5 +-
 22 files changed, 810 insertions(+), 170 deletions(-)

diff --git a/fe/pom.xml b/fe/pom.xml
index fdf3272..a96d8bc 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -419,6 +419,13 @@ under the License.
       <version>2.0.0.AM25</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <reporting>
diff --git a/fe/src/compat-hive-2/java/org/apache/hadoop/hive/common/ValidWriteIdList.java b/fe/src/compat-hive-2/java/org/apache/hadoop/hive/common/ValidWriteIdList.java
index 1257d1e..70399a4 100644
--- a/fe/src/compat-hive-2/java/org/apache/hadoop/hive/common/ValidWriteIdList.java
+++ b/fe/src/compat-hive-2/java/org/apache/hadoop/hive/common/ValidWriteIdList.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.common;
  * ValidWriteIdList is not supported in Hive 2
  */
 public class ValidWriteIdList {
-  enum RangeResponse {NONE, SOME, ALL};
+  public enum RangeResponse {NONE, SOME, ALL};
 
   public boolean isWriteIdValid(long writeId) {
     throw new UnsupportedOperationException("isWriteIdValid not supported for "
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 5be2341..85281c3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -16,11 +16,10 @@
 // under the License.
 package org.apache.impala.catalog;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
@@ -28,19 +27,23 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.HdfsShim;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.ListMap;
 import org.apache.impala.util.ThreadNameAnnotator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.Nullable;
 
 /**
  * Utility for loading file metadata within a partition directory.
@@ -53,6 +56,8 @@ public class FileMetadataLoader {
   private final boolean recursive_;
   private final ImmutableMap<String, FileDescriptor> oldFdsByRelPath_;
   private final ListMap<TNetworkAddress> hostIndex_;
+  @Nullable
+  private final ValidWriteIdList writeIds_;
 
   private boolean forceRefreshLocations = false;
 
@@ -65,13 +70,22 @@ public class FileMetadataLoader {
    * @param oldFds any pre-existing file descriptors loaded for this table, used
    *   to optimize refresh if available.
    * @param hostIndex the host index with which to associate the file descriptors
+   * @param writeIds if non-null, a write-id list which will filter the returned
+   *   file descriptors to only include those indicated to be valid.
+   *   TODO(todd) we also likely need to pass an open transaction list here to deal
+   *   with ignoring in-progress (not-yet-visible) compactions.
    */
   public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds,
-      ListMap<TNetworkAddress> hostIndex) {
+      ListMap<TNetworkAddress> hostIndex, @Nullable ValidWriteIdList writeIds) {
     partDir_ = Preconditions.checkNotNull(partDir);
     recursive_ = recursive;
     hostIndex_ = Preconditions.checkNotNull(hostIndex);
     oldFdsByRelPath_ = Maps.uniqueIndex(oldFds, FileDescriptor::getRelativePath);
+    writeIds_ = writeIds;
+
+    if (writeIds_ != null) {
+      Preconditions.checkArgument(recursive_, "ACID tables must be listed recursively");
+    }
   }
 
   /**
@@ -147,13 +161,27 @@ public class FileMetadataLoader {
       if (fileStatuses == null) return;
 
       Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
+
+      List<FileStatus> stats = new ArrayList<>();
       while (fileStatuses.hasNext()) {
-        FileStatus fileStatus = fileStatuses.next();
+        stats.add(fileStatuses.next());
+      }
+
+      if (writeIds_ != null) {
+        stats = AcidUtils.filterFilesForAcidState(stats, partDir_, writeIds_,
+            loadStats_);
+      }
+
+      for (FileStatus fileStatus : stats) {
+        if (fileStatus.isDirectory()) {
+          continue;
+        }
+
         if (!FileSystemUtil.isValidDataFile(fileStatus)) {
           ++loadStats_.hiddenFiles;
           continue;
         }
-        String relPath = relativizePath(fileStatus.getPath());
+        String relPath = FileSystemUtil.relativizePath(fileStatus.getPath(), partDir_);
         FileDescriptor fd = oldFdsByRelPath_.get(relPath);
         if (listWithLocations || forceRefreshLocations ||
             hasFileChanged(fd, fileStatus)) {
@@ -172,19 +200,6 @@ public class FileMetadataLoader {
   }
 
   /**
-   * Return the path of 'path' relative to the partition dir being listed. This may
-   * differ from simply the file name in the case of recursive listings.
-   */
-  private String relativizePath(Path path) {
-    URI relUri = partDir_.toUri().relativize(path.toUri());
-    if (relUri.isAbsolute() || relUri.getPath().startsWith("/")) {
-      throw new RuntimeException("FileSystem returned an unexpected path " +
-          path + " for a file within " + partDir_);
-    }
-    return relUri.getPath();
-  }
-
-  /**
    * Create a FileDescriptor for the given FileStatus. If the FS supports block locations,
    * and FileStatus is a LocatedFileStatus (i.e. the location was prefetched) this uses
    * the already-loaded information; otherwise, this may have to remotely look up the
@@ -217,6 +232,15 @@ public class FileMetadataLoader {
 
   // File/Block metadata loading stats for a single HDFS path.
   public class LoadStats {
+    /** Number of files skipped because they pertain to an uncommitted ACID transaction */
+    public int uncommittedAcidFilesSkipped = 0;
+
+    /**
+     * Number of files skipped because they pertain to ACID directories superceded
+     * by later base data.
+     */
+    public int filesSupercededByNewerBase = 0;
+
     // Number of files for which the metadata was loaded.
     public int loadedFiles = 0;
 
@@ -232,12 +256,23 @@ public class FileMetadataLoader {
 
     // Number of unknown disk IDs encountered while loading block
     // metadata for this path.
-    public long unknownDiskIds = 0;
+    public int unknownDiskIds = 0;
 
     public String debugString() {
-      return String.format("Path: %s: Loaded files: %s Hidden files: %s " +
-          "Skipped files: %s Unknown diskIDs: %s", partDir_, loadedFiles, hiddenFiles,
-          skippedFiles, unknownDiskIds);
+      return Objects.toStringHelper("")
+        .add("path", partDir_)
+        .add("loaded files", loadedFiles)
+        .add("hidden files", nullIfZero(hiddenFiles))
+        .add("skipped files", nullIfZero(skippedFiles))
+        .add("uncommited files", nullIfZero(uncommittedAcidFilesSkipped))
+        .add("superceded files", nullIfZero(filesSupercededByNewerBase))
+        .add("unknown diskIds", nullIfZero(unknownDiskIds))
+        .omitNullValues()
+        .toString();
+    }
+
+    private Integer nullIfZero(int x) {
+      return x > 0 ? x : null;
     }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b4d2531..5dc0f03 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -54,6 +55,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -560,13 +562,16 @@ public class HdfsTable extends Table implements FeFsTable {
           .add(p);
     }
 
+    ValidWriteIdList writeIds = validWriteIds_ != null
+        ? MetastoreShim.getValidWriteIdListFromString(validWriteIds_) : null;
+
     // Create a FileMetadataLoader for each path.
     Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
     for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
       List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
       FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
           Utils.shouldRecursivelyListPartitions(this),
-          oldFds, hostIndex_);
+          oldFds, hostIndex_, writeIds);
       // If there is a cached partition mapped to this path, we recompute the block
       // locations even if the underlying files have not changed.
       // This is done to keep the cached block metadata up to date.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index bf65db0..2effb03 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -51,6 +51,7 @@ import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
 
@@ -118,7 +119,9 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   // impalad.
   protected long lastUsedTime_;
 
-  // Valid write id list for this table
+  // Valid write id list for this table.
+  // null in the case that this table is not transactional.
+  // TODO(todd) this should probably be a ValidWriteIdList in memory instead of a String.
   protected String validWriteIds_ = null;
 
   // maximum number of catalog versions to store for in-flight events for this table
@@ -303,8 +306,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    */
   protected void loadValidWriteIdList(IMetaStoreClient client)
       throws TableLoadingException {
-    if (MetastoreShim.getMajorVersion() > 2) {
+    Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null);
+    if (MetastoreShim.getMajorVersion() > 2 &&
+        AcidUtils.isTransactionalTable(msTable_.getParameters())) {
       validWriteIds_ = fetchValidWriteIds(client);
+    } else {
+      validWriteIds_ = null;
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index ce75f1c..2aa3212 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -287,10 +287,13 @@ class DirectMetaProvider implements MetaProvider {
     Path partDir = new Path(msPartition.getSd().getLocation());
     // TODO(todd): The table property to disable recursive loading is not supported
     // by this code path. However, DirectMetaProvider is not yet a supported feature.
+    // TODO(todd) this code path would have to change to handle ACID tables -- we don't
+    // have the write ID list passed down at this point in the code.
     FileMetadataLoader fml = new FileMetadataLoader(partDir,
         /* recursive= */BackendConfig.INSTANCE.recursivelyListPartitions(),
         /* oldFds= */Collections.emptyList(),
-        hostIndex);
+        hostIndex,
+        /* writeIds=*/null);
 
     try {
       fml.load();
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 84a2674..4c3f65b 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -17,15 +17,8 @@
 
 package org.apache.impala.common;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Stack;
-import java.util.UUID;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,10 +28,10 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.adl.AdlFileSystem;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
-import org.apache.hadoop.fs.adl.AdlFileSystem;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -46,8 +39,14 @@ import org.apache.impala.catalog.HdfsCompression;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Stack;
+import java.util.UUID;
 
 /**
  * Common utility functions for operating on FileSystem objects.
@@ -540,6 +539,9 @@ public class FileSystemUtil {
    * the path does not exist. This helps simplify the caller code in cases where
    * the file does not exist and also saves an RPC as the caller need not do a separate
    * exists check for the path. Returns null if the path does not exist.
+   *
+   * If 'recursive' is true, all underlying files and directories will be yielded.
+   * Note that the order (breadth-first vs depth-first, sorted vs not) is undefined.
    */
   public static RemoteIterator<? extends FileStatus> listStatus(FileSystem fs, Path p,
       boolean recursive) throws IOException {
@@ -597,16 +599,29 @@ public class FileSystemUtil {
   }
 
   /**
+   * Return the path of 'path' relative to the startPath. This may
+   * differ from simply the file name in the case of recursive listings.
+   */
+  public static String relativizePath(Path path, Path startPath) {
+    URI relUri = startPath.toUri().relativize(path.toUri());
+    if (relUri.isAbsolute() || relUri.getPath().startsWith("/")) {
+      throw new RuntimeException("FileSystem returned an unexpected path " +
+          path + " for a file within " + startPath);
+    }
+    return relUri.getPath();
+  }
+
+  /**
    * Iterator which recursively visits directories on a FileSystem, yielding
-   * files in an unspecified order. Only files are yielded -- not directories.
+   * files in an unspecified order.
    */
-  private static class RecursingIterator implements RemoteIterator<FileStatus> {
+  static class RecursingIterator implements RemoteIterator<FileStatus> {
     private final FileSystem fs_;
     private final Stack<RemoteIterator<FileStatus>> iters_ = new Stack<>();
     private RemoteIterator<FileStatus> curIter_;
     private FileStatus curFile_;
 
-    private RecursingIterator(FileSystem fs, Path startPath) throws IOException {
+    RecursingIterator(FileSystem fs, Path startPath) throws IOException {
       this.fs_ = Preconditions.checkNotNull(fs);
       curIter_ = fs.listStatusIterator(Preconditions.checkNotNull(startPath));
     }
@@ -641,12 +656,13 @@ public class FileSystemUtil {
      * @throws IOException if any IO error occurs
      */
     private void handleFileStat(FileStatus fileStatus) throws IOException {
-      if (fileStatus.isFile()) { // file
+      if (fileStatus.isFile()) {
         curFile_ = fileStatus;
-      } else { // directory
-        iters_.push(curIter_);
-        curIter_ = fs_.listStatusIterator(fileStatus.getPath());
+        return;
       }
+      iters_.push(curIter_);
+      curIter_ = fs_.listStatusIterator(fileStatus.getPath());
+      curFile_ = fileStatus;
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 8bffca4..cb8b66f 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -16,9 +16,26 @@
 // under the License.
 package org.apache.impala.util;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import com.google.errorprone.annotations.Immutable;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
+import org.apache.impala.common.FileSystemUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
 
 /**
  * Contains utility functions for working with Acid tables.
@@ -34,6 +51,21 @@ public class AcidUtils {
   public static final String TABLE_IS_TRANSACTIONAL = "transactional";
   public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
 
+  private static final Pattern BASE_PATTERN = Pattern.compile(
+      "base_" +
+      "(?<writeId>\\d+)" +
+      "(?:_v(?<visibilityTxnId>\\d+))?" +
+      "(?:/.*)?");
+  private static final Pattern DELTA_PATTERN = Pattern.compile(
+      "delta_" +
+       "(?<minWriteId>\\d+)_" +
+       "(?<maxWriteId>\\d+)" +
+       "(?:_(?<optionalStatementId>\\d+))?" +
+       // Optional path suffix.
+       "(?:/.*)?");
+
+  @VisibleForTesting
+  static final long SENTINEL_BASE_WRITE_ID = Long.MIN_VALUE;
 
   // The code is same as what exists in AcidUtils.java in hive-exec.
   // Ideally we should move the AcidUtils code from hive-exec into
@@ -61,4 +93,139 @@ public class AcidUtils {
     return isTransactionalTable(props) && !isInsertOnlyTable(props);
   }
 
+  /**
+   * Predicate that checks if the file or directory is relevant for a given WriteId list.
+   * <p>
+   *  <b>Must be called only for ACID table.</b>
+   *  Checks that the path conforms to ACID table dir structure, and includes only
+   *  directories that correspond to valid committed transactions.
+   * </p>
+   */
+  private static class WriteListBasedPredicate implements Predicate<String> {
+
+    private final ValidWriteIdList writeIdList;
+
+    WriteListBasedPredicate(ValidWriteIdList writeIdList) {
+      this.writeIdList = Preconditions.checkNotNull(writeIdList);
+    }
+
+    public boolean test(String dirPath) {
+      long baseWriteId = getBaseWriteId(dirPath);
+      if (baseWriteId != SENTINEL_BASE_WRITE_ID) {
+        return writeIdList.isValidBase(baseWriteId);
+      } else {
+        ParsedDelta pd = parseDelta(dirPath);
+        if (pd != null) {
+          ValidWriteIdList.RangeResponse rr =
+              writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId);
+          return rr.equals(ValidWriteIdList.RangeResponse.ALL);
+        }
+      }
+      // If it wasn't in a base or a delta directory, we should include it.
+      // This allows post-upgrade tables to be read.
+      // TODO(todd) add an e2e test for this.
+      return true;
+    }
+  }
+
+  @VisibleForTesting
+  static long getBaseWriteId(String relPath) {
+    Matcher baseMatcher = BASE_PATTERN.matcher(relPath);
+    if (baseMatcher.matches()) {
+      return Long.valueOf(baseMatcher.group("writeId"));
+    }
+    return SENTINEL_BASE_WRITE_ID;
+  }
+
+  @Immutable
+  private static final class ParsedDelta {
+    final long minWriteId;
+    final long maxWriteId;
+    /**
+     * Negative value indicates there was no statement id.
+     */
+    final long statementId;
+
+    ParsedDelta(long minWriteId, long maxWriteId, long statementId) {
+      this.minWriteId = minWriteId;
+      this.maxWriteId = maxWriteId;
+      this.statementId = statementId;
+    }
+  }
+
+  private static ParsedDelta parseDelta(String dirPath) {
+    Matcher deltaMatcher = DELTA_PATTERN.matcher(dirPath);
+    if (!deltaMatcher.matches()) {
+      return null;
+    }
+    long minWriteId = Long.valueOf(deltaMatcher.group("minWriteId"));
+    long maxWriteId = Long.valueOf(deltaMatcher.group("maxWriteId"));
+    String statementIdStr = deltaMatcher.group("optionalStatementId");
+    long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1;
+    return new ParsedDelta(minWriteId, maxWriteId, statementId);
+  }
+
+  /**
+   * Filters the files based on Acid state.
+   * @param stats the FileStatuses obtained from recursively listing the directory
+   * @param baseDir the base directory for the partition (or table, in the case of
+   *   unpartitioned tables)
+   * @param writeIds the valid write IDs for the table
+   * @param loadStats stats to add counts of skipped files to. May be null.
+   * @return the FileStatuses that is a subset of passed in descriptors that
+   *    must be used.
+   */
+  public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
+      Path baseDir, ValidWriteIdList writeIds, @Nullable LoadStats loadStats) {
+    List<FileStatus> validStats = new ArrayList<>(stats);
+
+    // First filter out any paths that are not considered valid write IDs.
+    // At the same time, calculate the max valid base write ID.
+    Predicate<String> pred = new WriteListBasedPredicate(writeIds);
+    long maxBaseWriteId = Long.MIN_VALUE;
+    for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
+      FileStatus stat = it.next();
+      String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
+      if (!pred.test(relPath)) {
+        it.remove();
+        if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++;
+        continue;
+      }
+
+      maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId);
+    }
+
+    // Filter out any files that are superceded by the latest valid base,
+    // as well as any directories.
+    for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
+      FileStatus stat = it.next();
+
+      if (stat.isDirectory()) {
+        it.remove();
+        continue;
+      }
+
+      String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
+      long baseWriteId = getBaseWriteId(relPath);
+      if (baseWriteId != SENTINEL_BASE_WRITE_ID) {
+        if (baseWriteId < maxBaseWriteId) {
+          it.remove();
+          if (loadStats != null) loadStats.filesSupercededByNewerBase++;
+        }
+        continue;
+      }
+      ParsedDelta parsedDelta = parseDelta(relPath);
+      if (parsedDelta != null) {
+        if (parsedDelta.minWriteId <= maxBaseWriteId) {
+          it.remove();
+          if (loadStats != null) loadStats.filesSupercededByNewerBase++;
+        }
+        continue;
+      }
+
+      // Not in a base or a delta directory. In that case, it's probably a post-upgrade
+      // file and we should include it.
+    }
+    return validStats;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
index b429edc..1c8a853 100644
--- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
@@ -30,6 +30,7 @@ import org.apache.impala.service.Frontend;
 import org.apache.impala.testutil.ImpaladTestCatalog;
 import org.apache.impala.util.EventSequence;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -92,8 +93,8 @@ public class StmtMetadataLoaderTest {
   }
 
   private void validateTablesWriteIds(StmtTableCache stmtTableCache) {
+    Assert.assertTrue(stmtTableCache.tables.size() > 0);
     for (FeTable t: stmtTableCache.tables.values()) {
-      //TODO may check if it is acid table in the future
       Assert.assertTrue(t.isLoaded());
       Assert.assertTrue(t.getValidWriteIds() != null);
       Assert.assertTrue(MetastoreShim.getValidWriteIdListFromString(t.getValidWriteIds())
@@ -227,15 +228,9 @@ public class StmtMetadataLoaderTest {
         new String[] {"default", "functional"}, new String[] {"functional.alltypes"});
   }
 
-  @Ignore
   @Test
   public void testTableWriteID() throws ImpalaException {
-    if (MetastoreShim.getMajorVersion() == 2)
-      return;
-    // ToDo this assumes the acid tables have been created.
-    // They will be available after IMPALA-8439 is checked in.
-    // Ignore the test for now.
-    testLoadAcidTables("select * from acid.insert_only_no_partitions");
-    testLoadAcidTables("select * from acid.insert_only_with_partitions");
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    testLoadAcidTables("select * from functional.insert_only_transactional_table");
   }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index d2e974b..9ade422 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -42,7 +42,7 @@ public class FileMetadataLoaderTest {
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
-        /* oldFds = */Collections.emptyList(), hostIndex);
+        /* oldFds = */Collections.emptyList(), hostIndex, null);
     fml.load();
     assertEquals(24, fml.getStats().loadedFiles);
     assertEquals(24, fml.getLoadedFds().size());
@@ -56,7 +56,7 @@ public class FileMetadataLoaderTest {
 
     // Test that refreshing is properly incremental if no files changed.
     FileMetadataLoader refreshFml = new FileMetadataLoader(tablePath, /* recursive=*/true,
-        /* oldFds = */fml.getLoadedFds(), hostIndex);
+        /* oldFds = */fml.getLoadedFds(), hostIndex, null);
     refreshFml.load();
     assertEquals(24, refreshFml.getStats().skippedFiles);
     assertEquals(0, refreshFml.getStats().loadedFiles);
@@ -69,7 +69,7 @@ public class FileMetadataLoaderTest {
     fs.setTimes(filePath, fd.getModificationTime() + 1, /* atime= */-1);
 
     refreshFml = new FileMetadataLoader(tablePath, /* recursive=*/true,
-        /* oldFds = */fml.getLoadedFds(), hostIndex);
+        /* oldFds = */fml.getLoadedFds(), hostIndex, null);
     refreshFml.load();
     assertEquals(1, refreshFml.getStats().loadedFiles);
   }
@@ -80,9 +80,12 @@ public class FileMetadataLoaderTest {
       ListMap<TNetworkAddress> hostIndex = new ListMap<>();
       Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/does-not-exist/");
       FileMetadataLoader fml = new FileMetadataLoader(tablePath, recursive,
-          /* oldFds = */Collections.emptyList(), hostIndex);
+          /* oldFds = */Collections.emptyList(), hostIndex, null);
       fml.load();
       assertEquals(0, fml.getLoadedFds().size());
     }
   }
+
+  // TODO(todd) add unit tests for loading ACID tables once we have some ACID
+  // tables with data loaded in the functional test DBs.
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
index 069d39e..51706c1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
@@ -151,7 +151,7 @@ public class HdfsPartitionTest {
     Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas");
     ListMap<TNetworkAddress> origIndex = new ListMap<>();
     FileMetadataLoader fml = new FileMetadataLoader(p, /* recursive= */false,
-        Collections.emptyList(), origIndex);
+        Collections.emptyList(), origIndex, /*writeIds=*/null);
     fml.load();
     List<FileDescriptor> fileDescriptors = fml.getLoadedFds();
     assertTrue(!fileDescriptors.isEmpty());
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
new file mode 100644
index 0000000..4fc72a9
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.compat.MetastoreShim;
+import org.hamcrest.Matchers;
+import org.junit.Assume;
+import org.junit.Test;
+
+public class AcidUtilsTest {
+
+  /** Fake base path to root all FileStatuses under. */
+  private static final Path BASE_PATH = new Path("file:///foo/bar/");
+
+  public AcidUtilsTest() {
+    Assume.assumeTrue("Tests require Hive 3 to parse and use WriteIdList",
+        MetastoreShim.getMajorVersion() == 3);
+  }
+
+  private static List<FileStatus> createMockStats(String... testStrings) {
+    return Arrays.asList(testStrings).stream().map(s -> {
+      return new FileStatus(
+          /* length=*/0, /*isdir=*/s.endsWith("/"), /*block_replication=*/1,
+          /*blocksize=*/0, /*mtime=*/1, /*path=*/new Path(BASE_PATH, s));
+    }).collect(Collectors.toList());
+  }
+
+  private void assertFiltering(String[] relPaths, String validWriteIdListStr,
+      String[] expectedRelPaths) {
+
+    ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
+        validWriteIdListStr);
+    List<FileStatus> stats = createMockStats(relPaths);
+    List<FileStatus> expectedStats = createMockStats(expectedRelPaths);
+
+    assertThat(AcidUtils.filterFilesForAcidState(stats, BASE_PATH, writeIds, null),
+      Matchers.containsInAnyOrder(expectedStats.toArray()));
+  }
+
+  @Test
+  public void testParsingBaseWriteIds() {
+    assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
+        AcidUtils.getBaseWriteId("base_01.txt"));
+    assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
+        AcidUtils.getBaseWriteId("base_01_02"));
+    assertEquals(AcidUtils.SENTINEL_BASE_WRITE_ID,
+        AcidUtils.getBaseWriteId("abc/base_6"));
+    assertEquals(2, AcidUtils.getBaseWriteId("base_00002"));
+    assertEquals(2, AcidUtils.getBaseWriteId("base_00002/foo"));
+    assertEquals(2, AcidUtils.getBaseWriteId("base_00002_v12345"));
+    assertEquals(2, AcidUtils.getBaseWriteId("base_00002_v12345/foo"));
+  }
+
+  @Test
+  public void testBasePredicate() {
+    assertFiltering(new String[]{
+          "base_01.txt",
+          "post_upgrade.txt",
+          "base_0000005/",
+          "base_0000005/abc.txt",
+          "base_0000005/0000/",
+          "base_0000005/0000/abc.txt",
+          "abc/",
+          "abc/base_0000006/", // Not at root, so shouldn't be handled.
+          "base_00000100/def.txt"},
+        "default.test:10:1234:1,2,3",
+        new String[]{
+          "base_01.txt",
+          "post_upgrade.txt",
+          "base_0000005/abc.txt",
+          "base_0000005/0000/abc.txt"});
+  }
+
+  @Test
+  public void testPostCompactionBase() {
+    assertFiltering(new String[]{
+          "base_0000003_v0003217/",
+          "base_0000003_v0003217/000000_0",
+          "delta_0000001_0000001_0000/",
+          "delta_0000001_0000001_0000/000000_0",
+          "delta_0000002_0000002_0000/",
+          "delta_0000002_0000002_0000/000000_0",
+          "delta_0000003_0000003_0000/",
+          "delta_0000003_0000003_0000/000000_0"},
+      "test_acid_compaction_676f9a30.tt:3:9223372036854775807::",
+      new String[]{
+          "base_0000003_v0003217/000000_0"});
+  }
+
+  @Test
+  public void testDeltaPredicate() {
+    String[] paths = new String[]{
+        "delta_000005_0000005/",
+        "delta_000005_0000005/abc.txt",
+        "delta_000005_0000005_0000/",
+        "delta_000005_0000005_0000/abc.txt",
+        "delta_000006_0000020/",
+        "delta_000006_0000020/def.txt",
+        "delta_000005.txt"};
+
+    // Only committed up to transaction 10, so skip the 6-20 delta.
+    assertFiltering(paths,
+      "default.test:10:1234:1,2,3",
+      new String[]{
+          "delta_000005_0000005/abc.txt",
+          "delta_000005_0000005_0000/abc.txt",
+          "delta_000005.txt"});
+  }
+
+  @Test
+  public void testAcidStateFilter() {
+    assertFiltering(new String[]{
+          "base_0000009/",
+          "base_0000009/abc.txt",
+          "delta_000005_000005_0000/",
+          "delta_000005_000005_0000/lmn.txt",
+          "base_0000010/",
+          "base_0000010/00000_0",
+          "base_0000010/00001_0",
+          "delta_0000012_0000012_0000/",
+          "delta_0000012_0000012_0000/0000_0",
+          "delta_0000012_0000012_0000/0000_1"},
+      "", // Accept all,
+      new String[]{
+          "base_0000010/00000_0",
+          "base_0000010/00001_0",
+          "delta_0000012_0000012_0000/0000_0",
+          "delta_0000012_0000012_0000/0000_1"});
+  }
+
+  @Test
+  public void testAcidStateNoBase() {
+    assertFiltering(new String[]{
+        "delta_000005_000005_0000/",
+        "delta_000005_000005_0000/lmn.txt",
+        "base_000010/",
+        "delta_0000012_0000012_0000/",
+        "delta_0000012_0000012_0000/0000_0",
+        "delta_0000012_0000012_0000/0000_1"},
+    "", // writeIdList that accepts all transactions as valid
+    new String[]{
+        "delta_0000012_0000012_0000/0000_0",
+        "delta_0000012_0000012_0000/0000_1"});
+
+    // Same set of files, but no base directory.
+    assertFiltering(new String[]{
+        "delta_000005_000005_0000/",
+        "delta_000005_000005_0000/lmn.txt",
+        "delta_0000012_0000012_0000/",
+        "delta_0000012_0000012_0000/0000_0",
+        "delta_0000012_0000012_0000/0000_1"},
+    "", // writeIdList that accepts all transactions as valid
+    new String[]{
+        "delta_000005_000005_0000/lmn.txt",
+        "delta_0000012_0000012_0000/0000_0",
+        "delta_0000012_0000012_0000/0000_1"});
+  }
+}
diff --git a/fe/src/test/resources/hive-site.xml.py b/fe/src/test/resources/hive-site.xml.py
index ea299fd..24a00b6 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -106,6 +106,10 @@ if hive_major_version >= 3:
    # TODO(todd) re-evaluate whether this is necessary once TEZ-3310 is fixed
    # (see above).
    'hive.merge.tezfiles': 'true',
+
+   # Enable compaction workers. The compaction initiator is off by default
+   # but configuring a worker thread allows manual compaction.
+   'hive.compactor.worker.threads': 1
   })
 else:
   CONFIG.update({
diff --git a/testdata/bin/run-hive-server.sh b/testdata/bin/run-hive-server.sh
index a2f4bba..1718467 100755
--- a/testdata/bin/run-hive-server.sh
+++ b/testdata/bin/run-hive-server.sh
@@ -81,6 +81,25 @@ if [[ "$USE_CDP_HIVE" = "true" && -n "$SENTRY_HOME" ]]; then
   done
 fi
 
+# For Hive 3, we use Tez for execution. We have to add it to the classpath.
+# NOTE: it would seem like this would only be necessary on the HS2 classpath,
+# but compactions are initiated from the HMS in Hive 3. This may change at
+# some point in the future, in which case we can add this to only the
+# HS2 classpath.
+if ${USE_CDP_HIVE} ; then
+  export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_HOME}/*
+  # This is a little hacky, but Tez bundles a bunch of junk into lib/, such
+  # as extra copies of the hadoop libraries, etc, and we want to avoid conflicts.
+  # So, we'll be a bit choosy about what we add to the classpath here.
+  for jar in $TEZ_HOME/lib/* ; do
+    case $(basename $jar) in
+      commons-*|RoaringBitmap*)
+        export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$jar
+        ;;
+    esac
+  done
+fi
+
 # Starts a Hive Metastore Server on the specified port.
 # To debug log4j2 loading issues, add to HADOOP_CLIENT_OPTS:
 #   -Dorg.apache.logging.log4j.simplelog.StatusLogger.level=TRACE
@@ -91,21 +110,6 @@ HADOOP_CLIENT_OPTS="-Xmx2024m -Dhive.log.file=hive-metastore.log" hive \
 ${CLUSTER_BIN}/wait-for-metastore.py --transport=${METASTORE_TRANSPORT}
 
 if [ ${ONLY_METASTORE} -eq 0 ]; then
-  # For Hive 3, we use Tez for execution. We have to add it to the HS2 classpath.
-  if ${USE_CDP_HIVE} ; then
-    export HADOOP_CLASSPATH=${HADOOP_CLASSPATH}:${TEZ_HOME}/*
-    # This is a little hacky, but Tez bundles a bunch of junk into lib/, such
-    # as extra copies of the hadoop libraries, etc, and we want to avoid conflicts.
-    # So, we'll be a bit choosy about what we add to the classpath here.
-    for jar in $TEZ_HOME/lib/* ; do
-      case $(basename $jar) in
-        commons-*|RoaringBitmap*)
-          export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$jar
-          ;;
-      esac
-    done
-  fi
-
   # Starts a HiveServer2 instance on the port specified by the HIVE_SERVER2_THRIFT_PORT
   # environment variable. HADOOP_HEAPSIZE should be set to at least 2048 to avoid OOM
   # when loading ORC tables like widerow.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-compaction.test b/testdata/workloads/functional-query/queries/QueryTest/acid-compaction.test
new file mode 100644
index 0000000..62b6700
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-compaction.test
@@ -0,0 +1,39 @@
+====
+---- HIVE_QUERY
+use $DATABASE;
+create table tt (x int) tblproperties (
+  'transactional'='true',
+  'transactional_properties'='insert_only');
+
+insert into tt values (1);
+insert into tt values (2);
+insert into tt values (3);
+====
+---- QUERY
+invalidate metadata tt;
+select * from tt;
+---- RESULTS
+1
+2
+3
+====
+---- HIVE_QUERY
+use $DATABASE;
+alter table tt compact 'major' and wait;
+====
+---- QUERY
+refresh tt;
+select * from tt
+---- RESULTS
+1
+2
+3
+====
+---- QUERY
+show files in tt;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/tt/base_0000003_v\d+/000000_0','\d+B',''
+---- TYPES
+STRING,STRING,STRING
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
new file mode 100644
index 0000000..8ac3812
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -0,0 +1,22 @@
+====
+---- QUERY
+insert into functional.insert_only_transactional_table values (1);
+---- CATCH
+AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
+====
+---- QUERY
+alter table functional.insert_only_transactional_table change column x y bigint;
+---- CATCH
+AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
+====
+---- QUERY
+compute stats functional.insert_only_transactional_table;
+---- CATCH
+AnalysisException: Table functional.insert_only_transactional_table not supported. Transactional (ACID) tables are only supported for read.
+====
+---- QUERY
+select * from functional_orc_def.full_transactional_table;
+---- CATCH
+AnalysisException: Table functional_orc_def.full_transactional_table not supported. Transactional (ACID) tables are only supported when they are configured as insert_only.
+====
+
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test
new file mode 100644
index 0000000..f185d2d
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test
@@ -0,0 +1,38 @@
+====
+---- HIVE_QUERY
+use $DATABASE;
+create table tt (x int) tblproperties (
+  'transactional'='true',
+  'transactional_properties'='insert_only');
+
+insert into tt values (1);
+====
+---- QUERY
+invalidate metadata tt;
+select * from tt
+---- RESULTS
+1
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into tt values (2);
+====
+---- QUERY
+refresh tt;
+select * from tt order by x;
+---- RESULTS
+1
+2
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert overwrite table tt values (3);
+insert into tt values (4);
+====
+---- QUERY
+refresh tt;
+select * from tt order by x;
+---- RESULTS
+3
+4
+====
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 2f6f60c..2b10bd1 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -234,7 +234,7 @@ class ImpylaHS2Connection(ImpalaConnection):
   plus Impala-specific extensions, e.g. for fetching runtime profiles.
   TODO: implement support for kerberos, SSL, etc.
   """
-  def __init__(self, host_port, use_kerberos=False):
+  def __init__(self, host_port, use_kerberos=False, is_hive=False):
     self.__host_port = host_port
     if use_kerberos:
       raise NotImplementedError("Kerberos support not yet implemented")
@@ -246,6 +246,7 @@ class ImpylaHS2Connection(ImpalaConnection):
     self.__cursor = None
     # Query options to send along with each query.
     self.__query_options = {}
+    self._is_hive = is_hive
 
   def set_configuration_option(self, name, value):
     self.__query_options[name] = str(value)
@@ -261,15 +262,19 @@ class ImpylaHS2Connection(ImpalaConnection):
   def connect(self):
     LOG.info("-- connecting to {0} with impyla".format(self.__host_port))
     host, port = self.__host_port.split(":")
-    self.__impyla_conn = impyla.connect(host=host, port=int(port))
+    conn_kwargs = {}
+    if self._is_hive:
+      conn_kwargs['auth_mechanism'] = 'PLAIN'
+    self.__impyla_conn = impyla.connect(host=host, port=int(port), **conn_kwargs)
     # Get the default query options for the session before any modifications are made.
     self.__cursor = self.__impyla_conn.cursor()
-    self.__cursor.execute("set all")
     self.__default_query_options = {}
-    for name, val, _ in self.__cursor:
-      self.__default_query_options[name] = val
-    self.__cursor.close_operation()
-    LOG.debug("Default query options: {0}".format(self.__default_query_options))
+    if not self._is_hive:
+      self.__cursor.execute("set all")
+      for name, val, _ in self.__cursor:
+        self.__default_query_options[name] = val
+      self.__cursor.close_operation()
+      LOG.debug("Default query options: {0}".format(self.__default_query_options))
 
   def close(self):
     LOG.info("-- closing connection to: {0}".format(self.__host_port))
@@ -297,7 +302,8 @@ class ImpylaHS2Connection(ImpalaConnection):
         return r
 
   def execute_async(self, sql_stmt, user=None):
-    LOG.info("-- executing: {0}\n{1};\n".format(self.__host_port, sql_stmt))
+    LOG.info("-- executing against {0} at {1}\n{2};\n".format(
+        self._is_hive and 'Hive' or 'Impala', self.__host_port, sql_stmt))
     if user is not None:
       raise NotImplementedError("Not yet implemented for HS2 - authentication")
     try:
@@ -376,8 +382,17 @@ class ImpylaHS2Connection(ImpalaConnection):
         result_tuples = cursor.fetchall()
       else:
         result_tuples = cursor.fetchmany(max_rows)
-    log = self.get_log(handle)
-    profile = self.get_runtime_profile(handle, profile_format=profile_format)
+    elif self._is_hive:
+      # For Hive statements that have no result set (eg USE), they may still be
+      # running, and we need to wait for them to finish before we can proceed.
+      cursor._wait_to_finish()
+
+    if not self._is_hive:
+      log = self.get_log(handle)
+      profile = self.get_runtime_profile(handle, profile_format=profile_format)
+    else:
+      log = None
+      profile = None
     return ImpylaHS2ResultSet(success=True, result_tuples=result_tuples,
                               column_labels=column_labels, column_types=column_types,
                               query=handle.sql_stmt(), log=log, profile=profile)
@@ -416,12 +431,14 @@ class ImpylaHS2ResultSet(object):
       return str(val)
 
 
-def create_connection(host_port, use_kerberos=False, protocol='beeswax'):
+def create_connection(host_port, use_kerberos=False, protocol='beeswax',
+    is_hive=False):
   if protocol == 'beeswax':
     c = BeeswaxConnection(host_port=host_port, use_kerberos=use_kerberos)
   else:
     assert protocol == 'hs2'
-    c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos)
+    c = ImpylaHS2Connection(host_port=host_port, use_kerberos=use_kerberos,
+        is_hive=is_hive)
 
   # A hook in conftest sets tests.common.current_node.
   if hasattr(tests.common, "current_node"):
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index bdd14dd..1be55f7 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -35,7 +35,6 @@ from functools import wraps
 from getpass import getuser
 from random import choice
 from subprocess import check_call
-
 from tests.common.base_test_suite import BaseTestSuite
 from tests.common.errors import Timeout
 from tests.common.impala_connection import create_connection
@@ -119,6 +118,7 @@ SET_PATTERN = re.compile(
     COMMENT_LINES_REGEX + r'\s*set\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*=*', re.I)
 METRICS_URL = 'http://localhost:25000/metrics?json'
 
+GROUP_NAME = grp.getgrgid(pwd.getpwnam(getuser()).pw_gid).gr_name
 
 # Base class for Impala tests. All impala test cases should inherit from this class
 class ImpalaTestSuite(BaseTestSuite):
@@ -197,11 +197,13 @@ class ImpalaTestSuite(BaseTestSuite):
       cls.hs2_client.close()
 
   @classmethod
-  def create_impala_client(cls, host_port=None, protocol='beeswax'):
+  def create_impala_client(cls, host_port=None, protocol='beeswax',
+      is_hive=False):
     if host_port is None:
       host_port = cls.__get_default_host_port(protocol)
     client = create_connection(host_port=host_port,
-        use_kerberos=pytest.config.option.use_kerberos, protocol=protocol)
+        use_kerberos=pytest.config.option.use_kerberos, protocol=protocol,
+        is_hive=is_hive)
     client.connect()
     return client
 
@@ -329,6 +331,36 @@ class ImpalaTestSuite(BaseTestSuite):
         return int(m['value'])
     assert False, "Could not find metric: %s" % name
 
+  def __do_replacements(self, s, use_db=None, extra=None):
+    globs = globals()
+    repl = dict(('$' + k, globs[k]) for k in [
+        "FILESYSTEM_PREFIX",
+        "FILESYSTEM_NAME",
+        "GROUP_NAME",
+        "NAMENODE",
+        "IMPALA_HOME",
+        "INTERNAL_LISTEN_HOST",
+        "INTERNAL_LISTEN_IP"])
+    repl.update({
+        '$SECONDARY_FILESYSTEM': os.environ.get("SECONDARY_FILESYSTEM", ""),
+        '$USER': getuser()})
+
+    if use_db:
+      repl['$DATABASE'] = use_db
+    elif '$DATABASE' in s:
+      raise AssertionError("Query contains $DATABASE but no use_db specified")
+
+    if extra:
+      for k, v in extra.iteritems():
+        if k in repl:
+          raise RuntimeError("Key {0} is reserved".format(k))
+        repl[k] = v
+
+    for k, v in repl.iteritems():
+      s = s.replace(k, v)
+    return s
+
+
   def __verify_exceptions(self, expected_strs, actual_str, use_db):
     """
     Verifies that at least one of the strings in 'expected_str' is either:
@@ -339,14 +371,9 @@ class ImpalaTestSuite(BaseTestSuite):
     for expected_str in expected_strs:
       # In error messages, some paths are always qualified and some are not.
       # So, allow both $NAMENODE and $FILESYSTEM_PREFIX to be used in CATCH.
-      expected_str = expected_str.strip() \
-          .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
-          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME) \
-          .replace('$NAMENODE', NAMENODE) \
-          .replace('$IMPALA_HOME', IMPALA_HOME) \
-          .replace('$INTERNAL_LISTEN_HOST', INTERNAL_LISTEN_HOST)\
-          .replace('$INTERNAL_LISTEN_IP', INTERNAL_LISTEN_IP)
-      if use_db: expected_str = expected_str.replace('$DATABASE', use_db)
+      expected_str = self.__do_replacements(expected_str.strip(), use_db=use_db)
+      # Remove comments
+      expected_str = re.sub(COMMENT_LINES_REGEX, '', expected_str)
       # Strip newlines so we can split error message into multiple lines
       expected_str = expected_str.replace('\n', '')
       expected_regex = try_compile_regex(expected_str)
@@ -421,10 +448,6 @@ class ImpalaTestSuite(BaseTestSuite):
     exec_options = vector.get_value('exec_option')
     protocol = vector.get_value('protocol')
 
-    # Resolve the current user's primary group name.
-    group_id = pwd.getpwnam(getuser()).pw_gid
-    group_name = grp.getgrgid(group_id).gr_name
-
     target_impalad_clients = list()
     if multiple_impalad:
       target_impalad_clients =\
@@ -444,94 +467,105 @@ class ImpalaTestSuite(BaseTestSuite):
           table_format_info, use_db, pytest.config.option.scale_factor)
       impalad_client.set_configuration(exec_options)
 
+    def __exec_in_impala(query, user=None):
+      """
+      Helper to execute a query block in Impala, restoring any custom
+      query options after the completion of the set of queries.
+      """
+      # Support running multiple queries within the same test section, only verifying the
+      # result of the final query. The main use case is to allow for 'USE database'
+      # statements before a query executes, but it is not limited to that.
+      # TODO: consider supporting result verification of all queries in the future
+      result = None
+      target_impalad_client = choice(target_impalad_clients)
+      if user:
+        # Create a new client so the session will use the new username.
+        target_impalad_client = self.create_impala_client(protocol=protocol)
+      query_options_changed = []
+      try:
+        for query in query.split(';'):
+          set_pattern_match = SET_PATTERN.match(query)
+          if set_pattern_match:
+            query_options_changed.append(set_pattern_match.groups()[0])
+            assert set_pattern_match.groups()[0] not in vector.get_value("exec_option"), \
+                "%s cannot be set in  the '.test' file since it is in the test vector. " \
+                "Consider deepcopy()-ing the vector and removing this option in the " \
+                "python test." % set_pattern_match.groups()[0]
+          result = self.__execute_query(target_impalad_client, query, user=user)
+      finally:
+        if len(query_options_changed) > 0:
+          self.__restore_query_options(query_options_changed, target_impalad_client)
+      return result
+
+    def __exec_in_hive(query, user=None):
+      """
+      Helper to execute a query block in Hive. No special handling of query
+      options is done, since we use a separate session for each block.
+      """
+      h = ImpalaTestSuite.create_impala_client(HIVE_HS2_HOST_PORT, protocol='hs2',
+              is_hive=True)
+      try:
+        result = None
+        for query in query.split(';'):
+          result = h.execute(query, user=user)
+        return result
+      finally:
+        h.close()
+
     sections = self.load_query_test_file(self.get_workload(), test_file_name,
         encoding=encoding)
     for test_section in sections:
       if 'SHELL' in test_section:
         assert len(test_section) == 1, \
-          "SHELL test sections can't contain other sections"
-        cmd = test_section['SHELL']\
-          .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)\
-          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME)\
-          .replace('$IMPALA_HOME', IMPALA_HOME)
-        if use_db: cmd = cmd.replace('$DATABASE', use_db)
+            "SHELL test sections can't contain other sections"
+        cmd = self.__do_replacements(test_section['SHELL'], use_db=use_db,
+            extra=test_file_vars)
         LOG.info("Shell command: " + cmd)
         check_call(cmd, shell=True)
         continue
 
-      if 'QUERY' not in test_section:
-        assert 0, 'Error in test file %s. Test cases require a -- QUERY section.\n%s' %\
+      if 'QUERY' in test_section:
+        query_section = test_section['QUERY']
+        exec_fn = __exec_in_impala
+      elif 'HIVE_QUERY' in test_section:
+        query_section = test_section['HIVE_QUERY']
+        exec_fn = __exec_in_hive
+      else:
+        assert 0, ('Error in test file %s. Test cases require a ' +
+            '-- QUERY or HIVE_QUERY section.\n%s') %\
             (test_file_name, pprint.pformat(test_section))
 
       if 'SETUP' in test_section:
         self.execute_test_case_setup(test_section['SETUP'], table_format_info)
 
       # TODO: support running query tests against different scale factors
-      query = QueryTestSectionReader.build_query(test_section['QUERY']
-          .replace('$GROUP_NAME', group_name)
-          .replace('$IMPALA_HOME', IMPALA_HOME)
-          .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX)
-          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME)
-          .replace('$SECONDARY_FILESYSTEM', os.getenv("SECONDARY_FILESYSTEM") or str())
-          .replace('$USER', getuser())
-          .replace('$INTERNAL_LISTEN_HOST', INTERNAL_LISTEN_HOST)
-          .replace('$INTERNAL_LISTEN_IP', INTERNAL_LISTEN_IP))
-      if use_db: query = query.replace('$DATABASE', use_db)
-
-      reserved_keywords = ["$DATABASE", "$FILESYSTEM_PREFIX", "$FILESYSTEM_NAME",
-                           "$GROUP_NAME", "$IMPALA_HOME", "$NAMENODE", "$QUERY",
-                           "$SECONDARY_FILESYSTEM", "$USER"]
-
-      if test_file_vars:
-        for key, value in test_file_vars.iteritems():
-          if key in reserved_keywords:
-            raise RuntimeError("Key {0} is reserved".format(key))
-          query = query.replace(key, value)
+      query = QueryTestSectionReader.build_query(
+          self.__do_replacements(query_section, use_db=use_db, extra=test_file_vars))
 
       if 'QUERY_NAME' in test_section:
         LOG.info('Query Name: \n%s\n' % test_section['QUERY_NAME'])
 
-      # Support running multiple queries within the same test section, only verifying the
-      # result of the final query. The main use case is to allow for 'USE database'
-      # statements before a query executes, but it is not limited to that.
-      # TODO: consider supporting result verification of all queries in the future
       result = None
-      target_impalad_client = choice(target_impalad_clients)
-      query_options_changed = []
       try:
+        result = exec_fn(query, user=test_section.get('USER', '').strip() or None)
         user = None
         if 'USER' in test_section:
-          # Create a new client so the session will use the new username.
           user = test_section['USER'].strip()
-          target_impalad_client = self.create_impala_client(protocol=protocol)
-        for query in query.split(';'):
-          set_pattern_match = SET_PATTERN.match(query)
-          if set_pattern_match != None:
-            query_options_changed.append(set_pattern_match.groups()[0])
-            assert set_pattern_match.groups()[0] not in vector.get_value("exec_option"), \
-                "%s cannot be set in  the '.test' file since it is in the test vector. " \
-                "Consider deepcopy()-ing the vector and removing this option in the " \
-                "python test." % set_pattern_match.groups()[0]
-          result = self.__execute_query(target_impalad_client, query, user=user)
       except Exception as e:
         if 'CATCH' in test_section:
           self.__verify_exceptions(test_section['CATCH'], str(e), use_db)
           continue
         raise
-      finally:
-        if len(query_options_changed) > 0:
-          self.__restore_query_options(query_options_changed, target_impalad_client)
 
       if 'CATCH' in test_section and '__NO_ERROR__' not in test_section['CATCH']:
-        expected_str = " or ".join(test_section['CATCH']).strip() \
-          .replace('$FILESYSTEM_PREFIX', FILESYSTEM_PREFIX) \
-          .replace('$FILESYSTEM_NAME', FILESYSTEM_NAME) \
-          .replace('$NAMENODE', NAMENODE) \
-          .replace('$IMPALA_HOME', IMPALA_HOME)
-        assert False, "Expected exception: %s" % expected_str
+        expected_str = self.__do_replacements(" or ".join(test_section['CATCH']).strip(),
+              use_db=use_db,
+              extra=test_file_vars)
+        assert False, "Expected exception: {0}\n\nwhen running:\n\n{1}".format(
+            expected_str, query)
 
       assert result is not None
-      assert result.success
+      assert result.success, "Query failed: {0}".format(result.data)
 
       # Decode the results read back if the data is stored with a specific encoding.
       if encoding: result.data = [row.decode(encoding) for row in result.data]
@@ -548,10 +582,12 @@ class ImpalaTestSuite(BaseTestSuite):
         assert 'ERRORS' not in test_section,\
           "'ERRORS' sections must have accompanying 'RESULTS' sections"
       # If --update_results, then replace references to the namenode URI with $NAMENODE.
+      # TODO(todd) consider running do_replacements in reverse, though that may cause
+      # some false replacements for things like username.
       if pytest.config.option.update_results and 'RESULTS' in test_section:
         test_section['RESULTS'] = test_section['RESULTS'] \
             .replace(NAMENODE, '$NAMENODE') \
-            .replace('$IMPALA_HOME', IMPALA_HOME) \
+            .replace(IMPALA_HOME, '$IMPALA_HOME') \
             .replace(INTERNAL_LISTEN_HOST, '$INTERNAL_LISTEN_HOST') \
             .replace(INTERNAL_LISTEN_IP, '$INTERNAL_LISTEN_IP')
       rt_profile_info = None
@@ -575,7 +611,7 @@ class ImpalaTestSuite(BaseTestSuite):
         # test files that are checking the contents of tables larger than that anyways.
         dml_results_query = "select * from %s limit 1000" % \
             test_section['DML_RESULTS_TABLE']
-        dml_result = self.__execute_query(target_impalad_client, dml_results_query)
+        dml_result = exec_fn(dml_results_query)
         verify_raw_results(test_section, dml_result,
             vector.get_value('table_format').file_format, result_section='DML_RESULTS',
             update_section=pytest.config.option.update_results)
@@ -780,7 +816,7 @@ class ImpalaTestSuite(BaseTestSuite):
 
   # TODO(todd) make this use Thrift to connect to HS2 instead of shelling
   # out to beeline for better performance
-  def run_stmt_in_hive(self, stmt, username=getuser()):
+  def run_stmt_in_hive(self, stmt, username=None):
     """
     Run a statement in Hive, returning stdout if successful and throwing
     RuntimeError(stderr) if not.
@@ -814,7 +850,7 @@ class ImpalaTestSuite(BaseTestSuite):
           ['beeline',
            '--outputformat=csv2',
            '-u', 'jdbc:hive2://' + pytest.config.option.hive_server2,
-           '-n', username,
+           '-n', username or getuser(),
            '-e', stmt] + beeline_opts,
           stdout=subprocess.PIPE,
           stderr=subprocess.PIPE,
diff --git a/tests/common/skip.py b/tests/common/skip.py
index f41bb02..fea3f24 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -204,6 +204,11 @@ class SkipIfHive3:
       reason="Sentry HMS follower does not work with HMS-3. See SENTRY-2518 for details")
 
 
+class SkipIfHive2:
+  acid = pytest.mark.skipif(HIVE_MAJOR_VERSION == 2,
+      reason="Acid tables are only supported with Hive 3.")
+
+
 class SkipIfCatalogV2:
   """Expose decorators as methods so that is_catalog_v2_cluster() can be evaluated lazily
   when needed, instead of whenever this module is imported."""
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
new file mode 100644
index 0000000..3b64c54
--- /dev/null
+++ b/tests/query_test/test_acid.py
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Functional tests for ACID integration with Hive.
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIfHive2
+from tests.common.test_dimensions import create_single_exec_option_dimension
+
+
+class TestAcid(ImpalaTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestAcid, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+
+    # TODO(todd) consider running on other formats
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format in ['text'])
+
+  @SkipIfHive2.acid
+  def test_acid(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid', vector, use_db=unique_database)
+
+  @SkipIfHive2.acid
+  def test_acid_compaction(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-compaction', vector, use_db=unique_database)
+
+  @SkipIfHive2.acid
+  def test_acid_negative(self, vector, unique_database):
+    self.run_test_case('QueryTest/acid-negative', vector, use_db=unique_database)
+
+# TODO(todd): further tests to write:
+#  TRUNCATE, once HIVE-20137 is implemented.
+#  INSERT OVERWRITE with empty result set, once HIVE-21750 is fixed.
+#  Test for a post-upgrade Hive table which contains files not in ACID layout.
+#  Negative test for LOAD DATA INPATH and all other SQL that we don't support.
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index eeab5ad..789fb9a 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -94,8 +94,9 @@ def parse_query_test_file(file_name, valid_section_names=None, encoding=None):
   # (ex. planner, data error)
   section_names = valid_section_names
   if section_names is None:
-    section_names = ['QUERY', 'RESULTS', 'TYPES', 'LABELS', 'SETUP', 'CATCH', 'ERRORS',
-        'USER', 'RUNTIME_PROFILE', 'SHELL', 'DML_RESULTS', 'DBAPI_RESULTS', 'HS2_TYPES']
+    section_names = ['QUERY', 'HIVE_QUERY', 'RESULTS', 'TYPES', 'LABELS', 'SETUP',
+        'CATCH', 'ERRORS', 'USER', 'RUNTIME_PROFILE', 'SHELL', 'DML_RESULTS',
+        'DBAPI_RESULTS', 'HS2_TYPES']
   return parse_test_file(file_name, section_names, encoding=encoding,
       skip_unknown_sections=False)