You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2020/07/14 15:08:29 UTC

[impala] branch master updated: IMPALA-9859: Full ACID Milestone 4: Part 1 Reading modified tables (primitive types)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new f602c3f  IMPALA-9859: Full ACID Milestone 4: Part 1 Reading modified tables (primitive types)
f602c3f is described below

commit f602c3f80f5f61ccaebdf1493ff7c89230b77410
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Sun Jun 14 16:49:03 2020 +0200

    IMPALA-9859: Full ACID Milestone 4: Part 1 Reading modified tables (primitive types)
    
    Hive ACID supports row-level DELETE and UPDATE operations on a table.
    It achieves it via assigning a unique row-id for each row, and
    maintaining two sets of files in a table. The first set is in the
    base/delta directories, they contain the INSERTed rows. The second set
    of files are in the delete-delta directories, they contain the DELETEd
    rows.
    
    (UPDATE operations are implemented via DELETE+INSERT.)
    
    In the filesystem it looks like e.g.:
     * full_acid/delta_0000001_0000001_0000/0000_0
     * full_acid/delta_0000002_0000002_0000/0000_0
     * full_acid/delete_delta_0000003_0000003_0000/0000_0
    
    During scanning we need to return INSERTed rows minus DELETEd rows.
    This patch implements it by creating an ANTI JOIN between the INSERT and
    DELETE sets. It is a planner-only modification. Every HDFS SCAN
    that scans full ACID tables (that also have deleted rows) are converted
    to two HDFS SCANs, one for the INSERT deltas, and one for the DELETE
    deltas. Then a LEFT ANTI HASH JOIN with BROADCAST distribution mode is
    created above them.
    
    Later we can add support for other distribution modes if the performance
    requires it. E.g. if we have too many deleted rows then probably we are
    better off with PARTITIONED distribution mode. We could estimate the
    number of deleted rows by sampling the delete delta files.
    
    The current patch only works for primitive types. I.e. we cannot select
    nested data if the table has deleted rows.
    
    Testing:
     * added planner test
     * added e2e tests
    
    Change-Id: I15c8feabf40be1658f3dd46883f5a1b2aa5d0659
    Reviewed-on: http://gerrit.cloudera.org:8080/16082
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |   6 +
 common/thrift/CatalogService.thrift                |   6 +
 .../org/apache/impala/catalog/FeCatalogUtils.java  |  13 +-
 .../org/apache/impala/catalog/FeFsPartition.java   |  21 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  |   2 +
 .../apache/impala/catalog/FileMetadataLoader.java  |  21 +
 .../org/apache/impala/catalog/HdfsPartition.java   | 133 +++-
 .../java/org/apache/impala/catalog/HdfsTable.java  |  44 +-
 .../impala/catalog/ParallelFileMetadataLoader.java |   9 +-
 .../impala/catalog/local/CatalogdMetaProvider.java |  78 +-
 .../impala/catalog/local/DirectMetaProvider.java   |  61 +-
 .../impala/catalog/local/LocalFsPartition.java     |  56 +-
 .../apache/impala/catalog/local/LocalFsTable.java  |  11 +-
 .../apache/impala/catalog/local/MetaProvider.java  |   2 +
 .../org/apache/impala/planner/HashJoinNode.java    |  14 +-
 .../java/org/apache/impala/planner/JoinNode.java   |   8 +
 .../apache/impala/planner/SingleNodePlanner.java   | 177 +++++
 .../java/org/apache/impala/util/AcidUtils.java     |  15 +-
 .../org/apache/impala/planner/PlannerTest.java     |   8 +
 .../java/org/apache/impala/util/AcidUtilsTest.java |   9 +-
 .../functional/functional_schema_template.sql      |  25 +
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/PlannerTest/acid-scans.test            | 829 +++++++++++++++++++++
 .../queries/QueryTest/acid-negative.test           |  84 +--
 .../queries/QueryTest/full-acid-scans.test         | 195 +++++
 tests/custom_cluster/test_local_catalog.py         |  16 +-
 tests/query_test/test_acid.py                      |   9 +
 27 files changed, 1706 insertions(+), 147 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 7a73270..9700a30 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -306,6 +306,12 @@ struct THdfsPartition {
 
   9: optional list<THdfsFileDesc> file_desc
 
+  // List of ACID insert delta file descriptors.
+  21: optional list<THdfsFileDesc> insert_file_desc
+
+  // List of ACID delete delta file descriptors.
+  22: optional list<THdfsFileDesc> delete_file_desc
+
   // The access level Impala has on this partition (READ_WRITE, READ_ONLY, etc).
   11: optional TAccessLevel access_level
 
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index b099b1d..d14e0ff 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -348,6 +348,12 @@ struct TPartialPartitionInfo {
   // Set if 'want_partition_files' was set in TTableInfoSelector.
   4: optional list<CatalogObjects.THdfsFileDesc> file_descriptors
 
+  // Set if 'want_partition_files' was set in TTableInfoSelector.
+  8: optional list<CatalogObjects.THdfsFileDesc> insert_file_descriptors
+
+  // Set if 'want_partition_files' was set in TTableInfoSelector.
+  9: optional list<CatalogObjects.THdfsFileDesc> delete_file_descriptors
+
   // Deflate-compressed byte[] representation of TPartitionStats for this partition.
   // Set if 'want_partition_stats' was set in TTableInfoSelector. Not set if the
   // partition does not have stats.
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index ccaa238..4e2695e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -363,10 +363,21 @@ public abstract class FeCatalogUtils {
       long numBlocks = 0;
       long totalFileBytes = 0;
       for (FileDescriptor fd: part.getFileDescriptors()) {
-        thriftHdfsPart.addToFile_desc(fd.toThrift());
         numBlocks += fd.getNumFileBlocks();
         totalFileBytes += fd.getFileLength();
       }
+      if (!part.getInsertFileDescriptors().isEmpty()) {
+        for (FileDescriptor fd : part.getInsertFileDescriptors()) {
+          thriftHdfsPart.addToInsert_file_desc(fd.toThrift());
+        }
+        for (FileDescriptor fd : part.getDeleteFileDescriptors()) {
+          thriftHdfsPart.addToDelete_file_desc(fd.toThrift());
+        }
+      } else {
+        for (FileDescriptor fd: part.getFileDescriptors()) {
+          thriftHdfsPart.addToFile_desc(fd.toThrift());
+        }
+      }
       thriftHdfsPart.setNum_blocks(numBlocks);
       thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index 60cbb7a..4b66148 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -55,11 +55,21 @@ public interface FeFsPartition {
   FileSystemUtil.FsType getFsType();
 
   /**
-   * @return the files that this partition contains
+   * @return all the files that this partition contains, even delete delta files
    */
   List<FileDescriptor> getFileDescriptors();
 
   /**
+   * @return the insert delta files that this partition contains
+   */
+  List<FileDescriptor> getInsertFileDescriptors();
+
+  /**
+   * @return the delete delta files that this partition contains
+   */
+  List<FileDescriptor> getDeleteFileDescriptors();
+
+  /**
    * @return true if this partition contains any files
    */
   boolean hasFileDescriptors();
@@ -174,4 +184,13 @@ public interface FeFsPartition {
    */
   long getWriteId();
 
+  /**
+   * Returns new FeFsPartition that has the insert delta descriptors as file descriptors.
+   */
+  FeFsPartition genInsertDeltaPartition();
+
+  /**
+   * Returns new FeFsPartition that has the delete delta descriptors as file descriptors.
+   */
+  FeFsPartition genDeleteDeltaPartition();
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 23839a9..3917dae 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -371,6 +371,8 @@ public interface FeFsTable extends FeTable {
      * Its implementation tries to minimize the constant factor and object generation.
      * The given 'randomSeed' is used for random number generation.
      * The 'percentBytes' parameter must be between 0 and 100.
+     *
+     * TODO(IMPALA-9883): Fix this for full ACID tables.
      */
     public static Map<HdfsScanNode.SampledPartitionMetadata, List<FileDescriptor>>
         getFilesSample(FeFsTable table, Collection<? extends FeFsPartition> inputParts,
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index d847a78..2daca11 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -70,6 +70,8 @@ public class FileMetadataLoader {
   private boolean forceRefreshLocations = false;
 
   private List<FileDescriptor> loadedFds_;
+  private List<FileDescriptor> loadedInsertDeltaFds_;
+  private List<FileDescriptor> loadedDeleteDeltaFds_;
   private LoadStats loadStats_;
 
   /**
@@ -128,6 +130,14 @@ public class FileMetadataLoader {
     return loadedFds_;
   }
 
+  public List<FileDescriptor> getLoadedInsertDeltaFds() {
+    return loadedInsertDeltaFds_;
+  }
+
+  public List<FileDescriptor> getLoadedDeleteDeltaFds() {
+    return loadedDeleteDeltaFds_;
+  }
+
   /**
    * @return statistics about the descriptor loading process, after an invocation of
    * load()
@@ -221,6 +231,17 @@ public class FileMetadataLoader {
         }
         loadedFds_.add(Preconditions.checkNotNull(fd));;
       }
+      if (writeIds_ != null) {
+        loadedInsertDeltaFds_ = new ArrayList<>();
+        loadedDeleteDeltaFds_ = new ArrayList<>();
+        for (FileDescriptor fd : loadedFds_) {
+          if (AcidUtils.isDeleteDeltaFd(fd)) {
+            loadedDeleteDeltaFds_.add(fd);
+          } else {
+            loadedInsertDeltaFds_.add(fd);
+          }
+        }
+      }
       loadStats_.unknownDiskIds += numUnknownDiskIds.getRef();
       if (LOG.isTraceEnabled()) {
         LOG.trace(loadStats_.debugString());
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 1cb5c30..8dedc55 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -616,6 +616,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    */
   @Nonnull
   private final ImmutableList<byte[]> encodedFileDescriptors_;
+  private final ImmutableList<byte[]> encodedInsertFileDescriptors_;
+  private final ImmutableList<byte[]> encodedDeleteFileDescriptors_;
   private final HdfsPartitionLocationCompressor.Location location_;
   // True if this partition is marked as cached. Does not necessarily mean the data is
   // cached.
@@ -644,6 +646,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   private HdfsPartition(HdfsTable table, long id, List<LiteralExpr> partitionKeyValues,
       HdfsStorageDescriptor fileFormatDescriptor,
       @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
+      ImmutableList<byte[]> encodedInsertFileDescriptors,
+      ImmutableList<byte[]> encodedDeleteFileDescriptors,
       HdfsPartitionLocationCompressor.Location location,
       boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters,
       CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
@@ -654,6 +658,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     partitionKeyValues_ = ImmutableList.copyOf(partitionKeyValues);
     fileFormatDescriptor_ = fileFormatDescriptor;
     encodedFileDescriptors_ = encodedFileDescriptors;
+    encodedInsertFileDescriptors_ = encodedInsertFileDescriptors;
+    encodedDeleteFileDescriptors_ = encodedDeleteFileDescriptors;
     location_ = location;
     isMarkedCached_ = isMarkedCached;
     accessLevel_ = accessLevel;
@@ -825,7 +831,23 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   @Override // FeFsPartition
   public List<HdfsPartition.FileDescriptor> getFileDescriptors() {
     // Return a lazily transformed list from our internal bytes storage.
-    return Lists.transform(encodedFileDescriptors_, FileDescriptor.FROM_BYTES);
+    List<HdfsPartition.FileDescriptor> ret = new ArrayList<>();
+    ret.addAll(Lists.transform(encodedFileDescriptors_, FileDescriptor.FROM_BYTES));
+    ret.addAll(Lists.transform(encodedInsertFileDescriptors_, FileDescriptor.FROM_BYTES));
+    ret.addAll(Lists.transform(encodedDeleteFileDescriptors_, FileDescriptor.FROM_BYTES));
+    return ret;
+  }
+
+  @Override // FeFsPartition
+  public List<HdfsPartition.FileDescriptor> getInsertFileDescriptors() {
+    // Return a lazily transformed list from our internal bytes storage.
+    return Lists.transform(encodedInsertFileDescriptors_, FileDescriptor.FROM_BYTES);
+  }
+
+  @Override // FeFsPartition
+  public List<HdfsPartition.FileDescriptor> getDeleteFileDescriptors() {
+    // Return a lazily transformed list from our internal bytes storage.
+    return Lists.transform(encodedDeleteFileDescriptors_, FileDescriptor.FROM_BYTES);
   }
 
   /**
@@ -844,11 +866,16 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
 
   @Override // FeFsPartition
   public int getNumFileDescriptors() {
-    return encodedFileDescriptors_.size();
+    return encodedFileDescriptors_.size() +
+           encodedInsertFileDescriptors_.size() +
+           encodedDeleteFileDescriptors_.size();
   }
 
   @Override
-  public boolean hasFileDescriptors() { return !encodedFileDescriptors_.isEmpty(); }
+  public boolean hasFileDescriptors() {
+    return !encodedFileDescriptors_.isEmpty() ||
+           !encodedInsertFileDescriptors_.isEmpty();
+  }
 
   public CachedHmsPartitionDescriptor getCachedMsPartitionDescriptor() {
     return cachedMsPartitionDescriptor_;
@@ -907,6 +934,30 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
   }
 
   @Override
+  public long getWriteId() {
+    return writeId_;
+  }
+
+  @Override
+  public HdfsPartition genInsertDeltaPartition() {
+    ImmutableList<byte[]> fileDescriptors = !encodedInsertFileDescriptors_.isEmpty() ?
+        encodedInsertFileDescriptors_ : encodedFileDescriptors_;
+    return new HdfsPartition.Builder(this)
+        .setId(id_)
+        .setFileDescriptors(fileDescriptors)
+        .build();
+  }
+
+  @Override
+  public HdfsPartition genDeleteDeltaPartition() {
+    if (encodedDeleteFileDescriptors_.isEmpty()) return null;
+    return new HdfsPartition.Builder(this)
+        .setId(id_)
+        .setFileDescriptors(encodedDeleteFileDescriptors_)
+        .build();
+  }
+
+  @Override
   public String toString() {
     return MoreObjects.toStringHelper(this)
       .add("fileDescriptors", getFileDescriptors())
@@ -920,6 +971,8 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     private List<LiteralExpr> partitionKeyValues_;
     private HdfsStorageDescriptor fileFormatDescriptor_ = null;
     private ImmutableList<byte[]> encodedFileDescriptors_;
+    private ImmutableList<byte[]> encodedInsertFileDescriptors_;
+    private ImmutableList<byte[]> encodedDeleteFileDescriptors_;
     private HdfsPartitionLocationCompressor.Location location_ = null;
     private boolean isMarkedCached_ = false;
     private TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
@@ -969,17 +1022,29 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     public HdfsPartition build() {
       if (partitionKeyValues_ == null) partitionKeyValues_ = Collections.emptyList();
       if (encodedFileDescriptors_ == null) setFileDescriptors(Collections.emptyList());
+      if (encodedInsertFileDescriptors_ == null) {
+        setInsertFileDescriptors(Collections.emptyList());
+      }
+      if (encodedDeleteFileDescriptors_ == null) {
+        setDeleteFileDescriptors(Collections.emptyList());
+      }
       if (hmsParameters_ == null) hmsParameters_ = Collections.emptyMap();
       if (location_ == null) {
         // Only prototype partitions can have null locations.
         Preconditions.checkState(id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
       }
       return new HdfsPartition(table_, id_, partitionKeyValues_, fileFormatDescriptor_,
-          encodedFileDescriptors_, location_, isMarkedCached_, accessLevel_,
+          encodedFileDescriptors_, encodedInsertFileDescriptors_,
+          encodedDeleteFileDescriptors_, location_, isMarkedCached_, accessLevel_,
           hmsParameters_, cachedMsPartitionDescriptor_, partitionStats_,
           hasIncrementalStats_, numRows_, writeId_, inFlightEvents_);
     }
 
+    public Builder setId(long id) {
+      id_ = id;
+      return this;
+    }
+
     public Builder setMsPartition(
         org.apache.hadoop.hive.metastore.api.Partition msPartition)
         throws CatalogException {
@@ -1114,6 +1179,13 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       return Lists.transform(encodedFileDescriptors_, FileDescriptor.FROM_BYTES);
     }
 
+    public Builder clearFileDescriptors() {
+      encodedFileDescriptors_ = ImmutableList.of();
+      encodedInsertFileDescriptors_ = ImmutableList.of();
+      encodedDeleteFileDescriptors_ = ImmutableList.of();
+      return this;
+    }
+
     public Builder setFileDescriptors(List<FileDescriptor> descriptors) {
       // Store an eagerly transformed-and-copied list so that we drop the memory usage
       // of the flatbuffer wrapper.
@@ -1122,6 +1194,34 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       return this;
     }
 
+    public Builder setFileDescriptors(HdfsPartition partition) {
+      encodedFileDescriptors_ = partition.encodedFileDescriptors_;
+      encodedInsertFileDescriptors_ = partition.encodedInsertFileDescriptors_;
+      encodedDeleteFileDescriptors_ = partition.encodedDeleteFileDescriptors_;
+      return this;
+    }
+
+    public Builder setInsertFileDescriptors(List<FileDescriptor> descriptors) {
+      // Store an eagerly transformed-and-copied list so that we drop the memory usage
+      // of the flatbuffer wrapper.
+      encodedInsertFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
+          descriptors, FileDescriptor.TO_BYTES));
+      return this;
+    }
+
+    public Builder setDeleteFileDescriptors(List<FileDescriptor> descriptors) {
+      // Store an eagerly transformed-and-copied list so that we drop the memory usage
+      // of the flatbuffer wrapper.
+      encodedDeleteFileDescriptors_ = ImmutableList.copyOf(Lists.transform(
+          descriptors, FileDescriptor.TO_BYTES));
+      return this;
+    }
+
+    public Builder setFileDescriptors(ImmutableList<byte[]> encodedDescriptors) {
+      encodedFileDescriptors_ = encodedDescriptors;
+      return this;
+    }
+
     public HdfsFileFormat getFileFormat() {
       return fileFormatDescriptor_.getFileFormat();
     }
@@ -1179,6 +1279,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
           hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())));
     }
 
+    private List<FileDescriptor> fdsFromThrift(List<THdfsFileDesc> tFileDescs) {
+      List<FileDescriptor> ret = new ArrayList<>();
+      for (THdfsFileDesc desc : tFileDescs) {
+        ret.add(HdfsPartition.FileDescriptor.fromThrift(desc));
+      }
+      return ret;
+    }
+
     public List<LiteralExpr> getPartitionValues() {
       return partitionKeyValues_;
     }
@@ -1213,13 +1321,15 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
         }
       }
 
-      List<FileDescriptor> fileDescriptors = new ArrayList<>();
       if (thriftPartition.isSetFile_desc()) {
-        for (THdfsFileDesc desc : thriftPartition.getFile_desc()) {
-          fileDescriptors.add(HdfsPartition.FileDescriptor.fromThrift(desc));
-        }
+        setFileDescriptors(fdsFromThrift(thriftPartition.getFile_desc()));
+      }
+      if (thriftPartition.isSetInsert_file_desc()) {
+        setInsertFileDescriptors(fdsFromThrift(thriftPartition.getInsert_file_desc()));
+      }
+      if (thriftPartition.isSetDelete_file_desc()) {
+        setDeleteFileDescriptors(fdsFromThrift(thriftPartition.getDelete_file_desc()));
       }
-      setFileDescriptors(fileDescriptors);
 
       accessLevel_ = thriftPartition.isSetAccess_level() ?
           thriftPartition.getAccess_level() : TAccessLevel.READ_WRITE;
@@ -1286,9 +1396,4 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     }
     return 0;
   }
-
-  @Override
-  public long getWriteId() {
-    return writeId_;
-  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index cadc269..7e311fb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -71,6 +71,7 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -1177,8 +1178,8 @@ public class HdfsTable extends Table implements FeFsTable {
     // Copy over the FDs from the old partition to the new one, so that
     // 'refreshPartitionFileMetadata' below can compare modification times and
     // reload the locations only for those that changed.
-    partBuilder.setFileDescriptors(oldPartition.getFileDescriptors())
-        .setIsMarkedCached(isMarkedCached_);
+    partBuilder.setFileDescriptors(oldPartition);
+    partBuilder.setIsMarkedCached(isMarkedCached_);
     long fileMdLoadTime = loadFileMetadataForPartitions(client,
         ImmutableList.of(partBuilder), /*isRefresh=*/true);
     setUnpartitionedTableStats(partBuilder);
@@ -1631,14 +1632,21 @@ public class HdfsTable extends Table implements FeFsTable {
         }
 
         if (req.table_info_selector.want_partition_files) {
-          List<FileDescriptor> filteredFds = new ArrayList<>(part.getFileDescriptors());
           try {
-            numFilesFiltered += AcidUtils
-                .filterFdsForAcidState(filteredFds, reqWriteIdList);
-            partInfo.file_descriptors = Lists
-                .newArrayListWithCapacity(filteredFds.size());
-            for (FileDescriptor fd: filteredFds) {
-              partInfo.file_descriptors.add(fd.toThrift());
+            if (!part.getInsertFileDescriptors().isEmpty()) {
+              partInfo.file_descriptors = new ArrayList<>();
+              partInfo.insert_file_descriptors = new ArrayList<>();
+              numFilesFiltered += addFilteredFds(part.getInsertFileDescriptors(),
+                  partInfo.insert_file_descriptors, reqWriteIdList);
+              partInfo.delete_file_descriptors = new ArrayList<>();
+              numFilesFiltered += addFilteredFds(part.getDeleteFileDescriptors(),
+                  partInfo.delete_file_descriptors, reqWriteIdList);
+            } else {
+              partInfo.file_descriptors = new ArrayList<>();
+              numFilesFiltered += addFilteredFds(part.getFileDescriptors(),
+                  partInfo.file_descriptors, reqWriteIdList);
+              partInfo.insert_file_descriptors = new ArrayList<>();
+              partInfo.delete_file_descriptors = new ArrayList<>();
             }
             hits.inc();
           } catch (CatalogException ex) {
@@ -1685,6 +1693,16 @@ public class HdfsTable extends Table implements FeFsTable {
     return resp;
   }
 
+  private int addFilteredFds(List<FileDescriptor> fds, List<THdfsFileDesc> thriftFds,
+      ValidWriteIdList writeIdList) throws CatalogException {
+    List<FileDescriptor> filteredFds = new ArrayList<>(fds);
+    int numFilesFiltered = AcidUtils.filterFdsForAcidState(filteredFds, writeIdList);
+    for (FileDescriptor fd: filteredFds) {
+      thriftFds.add(fd.toThrift());
+    }
+    return numFilesFiltered;
+  }
+
   private double getFileMetadataCacheHitRate() {
     long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount();
     long misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount();
@@ -1732,6 +1750,10 @@ public class HdfsTable extends Table implements FeFsTable {
           stats.numBlocks += tHdfsPartition.getNum_blocks();
           stats.numFiles +=
               tHdfsPartition.isSetFile_desc() ? tHdfsPartition.getFile_desc().size() : 0;
+          stats.numFiles += tHdfsPartition.isSetInsert_file_desc() ?
+              tHdfsPartition.getInsert_file_desc().size() : 0;
+          stats.numFiles += tHdfsPartition.isSetDelete_file_desc() ?
+              tHdfsPartition.getDelete_file_desc().size() : 0;
           stats.totalFileBytes += tHdfsPartition.getTotal_file_size_bytes();
         }
         idToPartition.put(id, tHdfsPartition);
@@ -1985,7 +2007,7 @@ public class HdfsTable extends Table implements FeFsTable {
     long totalBytes = 0L;
     long totalNumFiles = 0L;
     for (FeFsPartition p: orderedPartitions) {
-      int numFiles = p.getFileDescriptors().size();
+      int numFiles = p.getNumFileDescriptors();
       long size = p.getSize();
       totalNumFiles += numFiles;
       totalBytes += size;
@@ -2096,7 +2118,7 @@ public class HdfsTable extends Table implements FeFsTable {
         || HdfsPartition.comparePartitionKeyValues(
             oldPartition.getPartitionValues(), partBuilder.getPartitionValues()) == 0);
     if (oldPartition != null) {
-      partBuilder.setFileDescriptors(oldPartition.getFileDescriptors());
+      partBuilder.setFileDescriptors(oldPartition);
     }
     loadFileMetadataForPartitions(client, ImmutableList.of(partBuilder),
         /*isRefresh=*/true);
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index fa51cbf..f14bae0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -124,7 +124,14 @@ public class ParallelFileMetadataLoader {
       FileMetadataLoader loader = loaders_.get(p);
 
       for (HdfsPartition.Builder partBuilder : e.getValue()) {
-        partBuilder.setFileDescriptors(loader.getLoadedFds());
+        partBuilder.clearFileDescriptors();
+        List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
+        if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
+          partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds());
+          partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds());
+        } else {
+          partBuilder.setFileDescriptors(loader.getLoadedFds());
+        }
       }
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index b71af6c..bd01dab 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -961,19 +961,14 @@ public class CatalogdMetaProvider implements MetaProvider {
 
       // Transform the file descriptors to the caller's index.
       checkResponse(part.file_descriptors != null, req, "missing file descriptors");
-      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(
-          part.file_descriptors.size());
-      for (THdfsFileDesc thriftFd: part.file_descriptors) {
-        FileDescriptor fd = FileDescriptor.fromThrift(thriftFd);
-        // The file descriptors returned via the RPC use host indexes that reference
-        // the 'network_addresses' list in the RPC. However, the caller may have already
-        // loaded some addresses into 'hostIndex'. So, the returned FDs need to be
-        // remapped to point to the caller's 'hostIndex' instead of the list in the
-        // RPC response.
-        fds.add(fd.cloneWithNewHostIndex(resp.table_info.network_addresses, hostIndex));
-      }
+      ImmutableList<FileDescriptor> fds = convertThriftFdList(part.file_descriptors,
+          resp.table_info.network_addresses, hostIndex);
+      ImmutableList<FileDescriptor> insertFds = convertThriftFdList(
+          part.insert_file_descriptors, resp.table_info.network_addresses, hostIndex);
+      ImmutableList<FileDescriptor> deleteFds = convertThriftFdList(
+          part.delete_file_descriptors, resp.table_info.network_addresses, hostIndex);
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
-          ImmutableList.copyOf(fds), part.getPartition_stats(),
+          fds, insertFds, deleteFds, part.getPartition_stats(),
           part.has_incremental_stats, part.is_marked_cached);
 
       checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
@@ -987,6 +982,21 @@ public class CatalogdMetaProvider implements MetaProvider {
     return ret;
   }
 
+  private ImmutableList<FileDescriptor> convertThriftFdList(List<THdfsFileDesc> thriftFds,
+      List<TNetworkAddress> networkAddresses, ListMap<TNetworkAddress> hostIndex) {
+    List<FileDescriptor> fds = Lists.newArrayListWithCapacity(thriftFds.size());
+    for (THdfsFileDesc thriftFd: thriftFds) {
+      FileDescriptor fd = FileDescriptor.fromThrift(thriftFd);
+      // The file descriptors returned via the RPC use host indexes that reference
+      // the 'network_addresses' list in the RPC. However, the caller may have already
+      // loaded some addresses into 'hostIndex'. So, the returned FDs need to be
+      // remapped to point to the caller's 'hostIndex' instead of the list in the
+      // RPC response.
+      fds.add(fd.cloneWithNewHostIndex(networkAddresses, hostIndex));
+    }
+    return ImmutableList.copyOf(fds);
+  }
+
   /**
    * Load all partitions from 'partitionRefs' that are currently present in the cache.
    * Any partitions that miss the cache are left unset in the resulting map.
@@ -1465,14 +1475,19 @@ public class CatalogdMetaProvider implements MetaProvider {
   public static class PartitionMetadataImpl implements PartitionMetadata {
     private final Partition msPartition_;
     private final ImmutableList<FileDescriptor> fds_;
+    private final ImmutableList<FileDescriptor> insertFds_;
+    private final ImmutableList<FileDescriptor> deleteFds_;
     private final byte[] partitionStats_;
     private final boolean hasIncrementalStats_;
     private final boolean isMarkedCached_;
 
     public PartitionMetadataImpl(Partition msPartition, ImmutableList<FileDescriptor> fds,
+        ImmutableList<FileDescriptor> insertFds, ImmutableList<FileDescriptor> deleteFds,
         byte[] partitionStats, boolean hasIncrementalStats, boolean isMarkedCached) {
       this.msPartition_ = Preconditions.checkNotNull(msPartition);
       this.fds_ = fds;
+      this.insertFds_ = insertFds;
+      this.deleteFds_ = deleteFds;
       this.partitionStats_ = partitionStats;
       this.hasIncrementalStats_ = hasIncrementalStats;
       this.isMarkedCached_ = isMarkedCached;
@@ -1485,14 +1500,26 @@ public class CatalogdMetaProvider implements MetaProvider {
     public PartitionMetadataImpl cloneRelativeToHostIndex(
         ListMap<TNetworkAddress> origIndex,
         ListMap<TNetworkAddress> dstIndex) {
-      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(fds_.size());
-      for (FileDescriptor fd: fds_) {
-        fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
-      }
-      return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds),
+      ImmutableList<FileDescriptor> fds = cloneFdsRelativeToHostIndex(
+          fds_, origIndex, dstIndex);
+      ImmutableList<FileDescriptor> insertFds = cloneFdsRelativeToHostIndex(
+          insertFds_, origIndex, dstIndex);
+      ImmutableList<FileDescriptor> deleteFds = cloneFdsRelativeToHostIndex(
+          deleteFds_, origIndex, dstIndex);
+      return new PartitionMetadataImpl(msPartition_, fds, insertFds, deleteFds,
           partitionStats_, hasIncrementalStats_, isMarkedCached_);
     }
 
+    private static ImmutableList<FileDescriptor> cloneFdsRelativeToHostIndex(
+        ImmutableList<FileDescriptor> fds, ListMap<TNetworkAddress> origIndex,
+        ListMap<TNetworkAddress> dstIndex) {
+      List<FileDescriptor> ret = Lists.newArrayListWithCapacity(fds.size());
+      for (FileDescriptor fd: fds) {
+        ret.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
+      }
+      return ImmutableList.copyOf(ret);
+    }
+
     @Override
     public Partition getHmsPartition() {
       return msPartition_;
@@ -1500,7 +1527,22 @@ public class CatalogdMetaProvider implements MetaProvider {
 
     @Override
     public ImmutableList<FileDescriptor> getFileDescriptors() {
-      return fds_;
+      if (insertFds_.isEmpty()) return fds_;
+      List<FileDescriptor> ret = Lists.newArrayListWithCapacity(
+          insertFds_.size() + deleteFds_.size());
+      ret.addAll(insertFds_);
+      ret.addAll(deleteFds_);
+      return ImmutableList.copyOf(ret);
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getInsertFileDescriptors() {
+      return insertFds_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getDeleteFileDescriptors() {
+      return deleteFds_;
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index aad788f..44a677c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -247,20 +247,35 @@ class DirectMetaProvider implements MetaProvider {
             " which was not requested. Requested: " + namesSet);
       }
 
-      ImmutableList<FileDescriptor> fds = loadFileMetadata(
-          fullTableName, partName, p, hostIndex);
-
-      PartitionMetadata existing = ret.put(partName, new PartitionMetadataImpl(p, fds));
+      FileMetadataLoader loader = loadFileMetadata(fullTableName, partName, p, hostIndex);
+      PartitionMetadata existing = ret.put(partName, createPartMetadataImpl(p, loader));
       if (existing != null) {
         throw new MetaException("HMS returned multiple partitions with name " +
             partName);
       }
     }
 
-
     return ret;
   }
 
+  private PartitionMetadataImpl createPartMetadataImpl(Partition p,
+      FileMetadataLoader loader) {
+    List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
+    ImmutableList<FileDescriptor> fds;
+    ImmutableList<FileDescriptor> insertFds;
+    ImmutableList<FileDescriptor> deleteFds;
+    if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
+      fds = ImmutableList.copyOf(Collections.emptyList());
+      insertFds = ImmutableList.copyOf(loader.getLoadedInsertDeltaFds());
+      deleteFds = ImmutableList.copyOf(loader.getLoadedDeleteDeltaFds());
+    } else {
+      fds = ImmutableList.copyOf(loader.getLoadedFds());
+      insertFds = ImmutableList.copyOf(Collections.emptyList());
+      deleteFds = ImmutableList.copyOf(Collections.emptyList());
+    }
+    return new PartitionMetadataImpl(p, fds, insertFds, deleteFds);
+  }
+
   /**
    * We model partitions slightly differently to Hive. So, in the case of an
    * unpartitioned table, we have to create a fake Partition object which has the
@@ -277,10 +292,10 @@ class DirectMetaProvider implements MetaProvider {
         "Expected empty partition name for unpartitioned table");
     Partition msPartition = msTableToPartition(table.msTable_);
     String fullName = table.dbName_ + "." + table.tableName_;
-    ImmutableList<FileDescriptor> fds = loadFileMetadata(fullName,
+    FileMetadataLoader loader = loadFileMetadata(fullName,
         "default",  msPartition, hostIndex);
-    return ImmutableMap.of("", (PartitionMetadata)new PartitionMetadataImpl(
-        msPartition, fds));
+    return ImmutableMap.of("",
+        (PartitionMetadata)createPartMetadataImpl(msPartition, loader));
   }
 
   static Partition msTableToPartition(Table msTable) {
@@ -324,7 +339,7 @@ class DirectMetaProvider implements MetaProvider {
     }
   }
 
-  private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
+  private FileMetadataLoader loadFileMetadata(String fullTableName,
       String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex)
         throws CatalogException {
     //TODO(IMPALA-9042): Remove "throws MetaException"
@@ -345,14 +360,14 @@ class DirectMetaProvider implements MetaProvider {
     } catch (FileNotFoundException fnf) {
       // If the partition directory isn't found, this is treated as having no
       // files.
-      return ImmutableList.of();
+      return fml;
     } catch (IOException ioe) {
       throw new LocalCatalogException(String.format(
           "Could not load files for partition %s of table %s",
           partName, fullTableName), ioe);
     }
 
-    return ImmutableList.copyOf(fml.getLoadedFds());
+    return fml;
   }
 
   @Immutable
@@ -373,11 +388,16 @@ class DirectMetaProvider implements MetaProvider {
   private static class PartitionMetadataImpl implements PartitionMetadata {
     private final Partition msPartition_;
     private final ImmutableList<FileDescriptor> fds_;
+    private final ImmutableList<FileDescriptor> insertFds_;
+    private final ImmutableList<FileDescriptor> deleteFds_;
 
     public PartitionMetadataImpl(Partition msPartition,
-        ImmutableList<FileDescriptor> fds) {
+        ImmutableList<FileDescriptor> fds, ImmutableList<FileDescriptor> insertFds,
+        ImmutableList<FileDescriptor> deleteFds) {
       this.msPartition_ = msPartition;
       this.fds_ = fds;
+      this.insertFds_ = insertFds;
+      this.deleteFds_ = deleteFds;
     }
 
     @Override
@@ -387,7 +407,22 @@ class DirectMetaProvider implements MetaProvider {
 
     @Override
     public ImmutableList<FileDescriptor> getFileDescriptors() {
-      return fds_;
+      if (insertFds_.isEmpty()) return fds_;
+      List<FileDescriptor> ret = Lists.newArrayListWithCapacity(
+          insertFds_.size() + deleteFds_.size());
+      ret.addAll(insertFds_);
+      ret.addAll(deleteFds_);
+      return ImmutableList.copyOf(ret);
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getInsertFileDescriptors() {
+      return insertFds_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getDeleteFileDescriptors() {
+      return deleteFds_;
     }
 
     @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 3848ced..2905987 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.catalog.local;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -51,7 +52,13 @@ public class LocalFsPartition implements FeFsPartition {
    * Null in the case of a 'prototype partition'.
    */
   @Nullable
-  private final ImmutableList<FileDescriptor> fileDescriptors_;
+  private ImmutableList<FileDescriptor> fileDescriptors_;
+
+  @Nullable
+  private ImmutableList<FileDescriptor> insertFileDescriptors_;
+
+  @Nullable
+  private ImmutableList<FileDescriptor> deleteFileDescriptors_;
 
   @Nullable
   private final byte[] partitionStats_;
@@ -66,11 +73,15 @@ public class LocalFsPartition implements FeFsPartition {
 
   public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec,
       Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors,
+      ImmutableList<FileDescriptor> insertFileDescriptors,
+      ImmutableList<FileDescriptor> deleteFileDescriptors,
       byte [] partitionStats, boolean hasIncrementalStats, boolean isMarkedCached) {
     table_ = Preconditions.checkNotNull(table);
     spec_ = Preconditions.checkNotNull(spec);
     msPartition_ = Preconditions.checkNotNull(msPartition);
     fileDescriptors_ = fileDescriptors;
+    insertFileDescriptors_ = insertFileDescriptors;
+    deleteFileDescriptors_ = deleteFileDescriptors;
     partitionStats_ = partitionStats;
     hasIncrementalStats_ = hasIncrementalStats;
     isMarkedCached_ = isMarkedCached;
@@ -100,17 +111,35 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public List<FileDescriptor> getFileDescriptors() {
-    return fileDescriptors_;
+    if (!fileDescriptors_.isEmpty()) return fileDescriptors_;
+    List<FileDescriptor> ret = new ArrayList<>();
+    ret.addAll(insertFileDescriptors_);
+    ret.addAll(deleteFileDescriptors_);
+    return ret;
+  }
+
+  @Override
+  public List<FileDescriptor> getInsertFileDescriptors() {
+    return insertFileDescriptors_;
+  }
+
+  @Override
+  public List<FileDescriptor> getDeleteFileDescriptors() {
+    return deleteFileDescriptors_;
   }
 
   @Override
   public boolean hasFileDescriptors() {
-    return !fileDescriptors_.isEmpty();
+    return !fileDescriptors_.isEmpty() ||
+           !insertFileDescriptors_.isEmpty() ||
+           !deleteFileDescriptors_.isEmpty();
   }
 
   @Override
   public int getNumFileDescriptors() {
-    return fileDescriptors_.size();
+    return fileDescriptors_.size() +
+           insertFileDescriptors_.size() +
+           deleteFileDescriptors_.size();
   }
 
   @Override
@@ -188,7 +217,7 @@ public class LocalFsPartition implements FeFsPartition {
   @Override
   public long getSize() {
     long size = 0;
-    for (FileDescriptor fd : fileDescriptors_) {
+    for (FileDescriptor fd : getFileDescriptors()) {
       size += fd.getFileLength();
     }
     return size;
@@ -228,4 +257,21 @@ public class LocalFsPartition implements FeFsPartition {
   public long getWriteId() {
     return MetastoreShim.getWriteIdFromMSPartition(msPartition_);
   }
+
+  @Override
+  public LocalFsPartition genInsertDeltaPartition() {
+    ImmutableList<FileDescriptor> fds = insertFileDescriptors_.isEmpty() ?
+        fileDescriptors_ : insertFileDescriptors_;
+    return new LocalFsPartition(table_, spec_, msPartition_, fds,
+        ImmutableList.of(), ImmutableList.of(), partitionStats_,
+        hasIncrementalStats_, isMarkedCached_);
+  }
+
+  @Override
+  public LocalFsPartition genDeleteDeltaPartition() {
+    if (deleteFileDescriptors_.isEmpty()) return null;
+    return new LocalFsPartition(table_, spec_, msPartition_, deleteFileDescriptors_,
+        ImmutableList.of(), ImmutableList.of(), partitionStats_,
+        hasIncrementalStats_, isMarkedCached_);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 4c4d6e8..a611fd9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -44,6 +44,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PrunablePartition;
 import org.apache.impala.catalog.SqlConstraints;
@@ -382,8 +383,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     LocalPartitionSpec spec = new LocalPartitionSpec(
         this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
     LocalFsPartition prototypePartition = new LocalFsPartition(
-        this, spec, protoMsPartition, /*fileDescriptors=*/null, /*partitionStats=*/null,
-        /*hasIncrementalStats=*/false, /*isMarkedCached=*/false);
+        this, spec, protoMsPartition, /*fileDescriptors=*/null,
+        /*insertFileDescriptors=*/null, /*deleteFileDescriptors=*/null,
+        /*partitionStats=*/null, /*hasIncrementalStats=*/false, /*isMarkedCached=*/false);
     return prototypePartition;
   }
 
@@ -457,8 +459,11 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
             "' (perhaps it was concurrently dropped by another process)");
       }
 
+      ImmutableList<FileDescriptor> fds = p.getInsertFileDescriptors().isEmpty() ?
+          p.getFileDescriptors() : ImmutableList.of();
       LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
-          p.getFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats(),
+          fds, p.getInsertFileDescriptors(),
+          p.getDeleteFileDescriptors(), p.getPartitionStats(), p.hasIncrementalStats(),
           p.isMarkedCached());
       ret.add(part);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index a0d4419..3d9a402 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -147,6 +147,8 @@ public interface MetaProvider {
   interface PartitionMetadata {
     Partition getHmsPartition();
     ImmutableList<FileDescriptor> getFileDescriptors();
+    ImmutableList<FileDescriptor> getInsertFileDescriptors();
+    ImmutableList<FileDescriptor> getDeleteFileDescriptors();
     byte[] getPartitionStats();
     boolean hasIncrementalStats();
     boolean isMarkedCached();
diff --git a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
index 0bb77aa..cef15a6 100644
--- a/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
@@ -175,13 +175,15 @@ public class HashJoinNode extends JoinNode {
       }
     }
     if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
-      output.append(detailPrefix + "hash predicates: ");
-      for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
-        Expr eqConjunct = eqJoinConjuncts_.get(i);
-        output.append(eqConjunct.toSql());
-        if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
+      if (!isAcidJoin_ || detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
+        output.append(detailPrefix + "hash predicates: ");
+        for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
+          Expr eqConjunct = eqJoinConjuncts_.get(i);
+          output.append(eqConjunct.toSql());
+          if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
+        }
+        output.append("\n");
       }
-      output.append("\n");
 
       // Optionally print FK/PK equi-join conjuncts.
       if (joinOp_.isInnerJoin() || joinOp_.isOuterJoin()) {
diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
index 0a3a708..fc8d5a8 100644
--- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java
@@ -80,6 +80,14 @@ public abstract class JoinNode extends PlanNode {
   // joinTableId_
   protected JoinTableId joinTableId_ = JoinTableId.INVALID;
 
+  // True if this join is used to do the join between insert and delete delta files.
+  protected boolean isAcidJoin_ = false;
+
+  public void setIsAcidJoin() {
+    isAcidJoin_ = true;
+    displayName_ = "DELETE EVENTS " + displayName_;
+  }
+
   // List of equi-join conjuncts believed to be involved in a FK/PK relationship.
   // The conjuncts are grouped by the tuple ids of the joined base table refs. A conjunct
   // is only included in this list if it is of the form <SlotRef> = <SlotRef> and the
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index bbc7140..f4d73a6 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -43,6 +43,8 @@ import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.MultiAggregateInfo;
 import org.apache.impala.analysis.MultiAggregateInfo.AggPhase;
 import org.apache.impala.analysis.NullLiteral;
+import org.apache.impala.analysis.Path;
+import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.analysis.SelectStmt;
 import org.apache.impala.analysis.SingularRowSrcTableRef;
@@ -56,6 +58,7 @@ import org.apache.impala.analysis.TupleId;
 import org.apache.impala.analysis.TupleIsNullPredicate;
 import org.apache.impala.analysis.UnionStmt;
 import org.apache.impala.analysis.UnionStmt.UnionOperand;
+import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.FeDataSourceTable;
 import org.apache.impala.catalog.FeFsPartition;
@@ -64,13 +67,19 @@ import org.apache.impala.catalog.FeHBaseTable;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.ScalarType;
+import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.planner.JoinNode.DistributionMode;
 import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.util.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1409,6 +1418,10 @@ public class SingleNodePlanner {
       }
       unionNode.init(analyzer);
       return unionNode;
+    } else if (addAcidSlotsIfNeeded(analyzer, hdfsTblRef, partitions)) {
+      // We are scanning a full ACID table that has delete delta files. Let's create
+      // a LEFT ANTI JOIN between the insert deltas and delete deltas.
+      return createAcidJoinNode(analyzer, hdfsTblRef, conjuncts, partitions, pair.second);
     } else {
       HdfsScanNode scanNode =
           new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, partitions,
@@ -1419,6 +1432,170 @@ public class SingleNodePlanner {
   }
 
   /**
+   * Adds partitioning columns and ACID columns to the table. It's needed to do a hash
+   * join between insert deltas and delete deltas.
+   * Returns true when the slot refs are added successfully and they are needed.
+   * Return false means this was a no-op because the slot refs are not needed, e.g.
+   * it's a non-ACID table, or there are no delete delta files.
+   * Throws exception in case of errors.
+   */
+  private boolean addAcidSlotsIfNeeded(Analyzer analyzer, TableRef hdfsTblRef,
+      List<? extends FeFsPartition> partitions) throws AnalysisException {
+    FeTable feTable = hdfsTblRef.getTable();
+    if (!AcidUtils.isFullAcidTable(feTable.getMetaStoreTable().getParameters())) {
+      return false;
+    }
+    boolean areThereDeletedRows = false;
+    for (FeFsPartition partition: partitions) {
+      if (partition.genDeleteDeltaPartition() != null) {
+        areThereDeletedRows = true;
+        break;
+      }
+    }
+    if (!areThereDeletedRows) return false;
+    final String NOT_SUPPORTED_YET = "This query is not supported on full ACID tables " +
+        "that have deleted rows and complex types. As a workaround you can run a " +
+        "major compaction.";
+    if (hdfsTblRef instanceof CollectionTableRef) {
+      throw new AnalysisException(NOT_SUPPORTED_YET);
+    }
+    for (SlotDescriptor slotDesc : hdfsTblRef.getDesc().getSlots()) {
+      if (slotDesc.getItemTupleDesc() != null) {
+        throw new AnalysisException(NOT_SUPPORTED_YET);
+      }
+    }
+    addAcidSlots(analyzer, hdfsTblRef);
+    return true;
+  }
+
+  private void addAcidSlots(Analyzer analyzer, TableRef hdfsTblRef)
+      throws AnalysisException {
+    FeTable feTable = hdfsTblRef.getTable();
+    List<String> rawPath = new ArrayList<>();
+    rawPath.add(hdfsTblRef.getUniqueAlias());
+    // Add slot refs for the partitioning columns.
+    for (Column partCol : feTable.getClusteringColumns()) {
+      rawPath.add(partCol.getName());
+      addSlotRefToDesc(analyzer, rawPath);
+      rawPath.remove(rawPath.size() - 1);
+    }
+    // Add slot refs for the ACID fields that identify rows.
+    rawPath.add("row__id");
+    String[] acidFields = {"originaltransaction", "bucket", "rowid"};
+    for (String acidField : acidFields) {
+      rawPath.add(acidField);
+      addSlotRefToDesc(analyzer, rawPath);
+      rawPath.remove(rawPath.size() - 1);
+    }
+  }
+
+  /**
+   * Adds a new slot ref with path 'rawPath' to its tuple descriptor. This is a no-op if
+   * the tuple descriptor already has a slot ref with the given raw path.
+   */
+  private void addSlotRefToDesc(Analyzer analyzer, List<String> rawPath)
+      throws AnalysisException {
+    Path resolvedPath = null;
+    try {
+      resolvedPath = analyzer.resolvePath(rawPath, PathType.SLOT_REF);
+    } catch (TableLoadingException e) {
+      // Should never happen because we only check registered table aliases.
+      Preconditions.checkState(false);
+    }
+    Preconditions.checkNotNull(resolvedPath);
+    SlotDescriptor desc = analyzer.registerSlotRef(resolvedPath);
+    desc.setIsMaterialized(true);
+  }
+
+  /**
+   * Takes an 'hdfsTblRef' of an ACID table and creates two scan nodes for it. One for
+   * the insert delta files, and one for the delete files. On top of the two scans it
+   * adds a LEFT ANTI HASH JOIN with BROADCAST distribution mode. I.e. delete events will
+   * be broadcasted to the nodes that scan the insert files.
+   */
+  private PlanNode createAcidJoinNode(Analyzer analyzer, TableRef hdfsTblRef,
+      List<Expr> conjuncts, List<? extends FeFsPartition> partitions,
+      List<Expr> partConjuncts)
+      throws ImpalaException {
+    FeTable feTable = hdfsTblRef.getTable();
+    Preconditions.checkState(AcidUtils.isFullAcidTable(
+      feTable.getMetaStoreTable().getParameters()));
+
+    // Let's create separate partitions for inserts and deletes.
+    List<FeFsPartition> insertDeltaPartitions = new ArrayList<>();
+    List<FeFsPartition> deleteDeltaPartitions = new ArrayList<>();
+    for (FeFsPartition part : partitions) {
+      insertDeltaPartitions.add(part.genInsertDeltaPartition());
+      FeFsPartition deleteDeltaPartition = part.genDeleteDeltaPartition();
+      if (deleteDeltaPartition != null) deleteDeltaPartitions.add(deleteDeltaPartition);
+    }
+    // The followings just create separate scan nodes for insert deltas and delete deltas,
+    // plus adds a LEFT ANTI HASH JOIN above them.
+    TableRef deleteDeltaRef = new TableRef(hdfsTblRef.getPath(),
+        hdfsTblRef.getUniqueAlias() + "-delete-delta");
+    deleteDeltaRef = analyzer.resolveTableRef(deleteDeltaRef);
+    deleteDeltaRef.analyze(analyzer);
+    addAcidSlots(analyzer, deleteDeltaRef);
+    HdfsScanNode deltaScanNode = new HdfsScanNode(ctx_.getNextNodeId(),
+        hdfsTblRef.getDesc(), conjuncts, insertDeltaPartitions, hdfsTblRef,
+        /*aggInfo=*/null, partConjuncts, /*isPartitionKeyScan=*/false);
+    deltaScanNode.init(analyzer);
+    HdfsScanNode deleteDeltaScanNode = new HdfsScanNode(ctx_.getNextNodeId(),
+        deleteDeltaRef.getDesc(), Collections.emptyList(), deleteDeltaPartitions,
+        deleteDeltaRef, /*aggInfo=*/null, partConjuncts, /*isPartitionKeyScan=*/false);
+    deleteDeltaScanNode.init(analyzer);
+    //TODO: ACID join conjuncts currently contain predicates for all partitioning columns
+    // and the ACID fields. So all of those columns will be the inputs of the hash
+    // function in the HASH JOIN. Probably we could only include 'originalTransaction' and
+    // 'rowid' in the hash predicates, while passing the other conjuncts in
+    // 'otherJoinConjuncts'.
+    List<BinaryPredicate> acidJoinConjuncts = createAcidJoinConjuncts(analyzer,
+        hdfsTblRef.getDesc(), deleteDeltaRef.getDesc());
+    JoinNode acidJoin = new HashJoinNode(deltaScanNode, deleteDeltaScanNode,
+        /*straight_join=*/true, DistributionMode.BROADCAST, JoinOperator.LEFT_ANTI_JOIN,
+        acidJoinConjuncts, /*otherJoinConjuncts=*/Collections.emptyList());
+    acidJoin.setId(ctx_.getNextNodeId());
+    acidJoin.init(analyzer);
+    acidJoin.setIsAcidJoin();
+    return acidJoin;
+  }
+
+  /**
+   * Creates conjuncts used in the ANTI-JOIN between insert deltas and delete
+   * deltas. I.e. it adds equality predicates for the partitioning columns and
+   * ACID columns. E.g. [insertDelta.part = deleteDelta.part,
+   * insertDelta.row__id.rowid = deleteDelta.row__id.rowid, ...]
+   *
+   * @param insertTupleDesc Tuple descriptor of the insert delta scan node
+   * @param deleteTupleDesc Tuple descriptor of the delete delta scan node
+   */
+  List<BinaryPredicate> createAcidJoinConjuncts(Analyzer analyzer,
+      TupleDescriptor insertTupleDesc, TupleDescriptor deleteTupleDesc)
+      throws AnalysisException {
+    List<BinaryPredicate> ret = new ArrayList<>();
+    // 'deleteTupleDesc' only has slot descriptors for the slots needed in the JOIN, i.e.
+    // it only has slot refs for the partitioning columns and ACID columns. Therefore we
+    // can just iterate over it and find the corresponding slot refs in the insert tuple
+    // descriptor and create an equality predicate between the slot ref pairs.
+    for (SlotDescriptor deleteSlotDesc : deleteTupleDesc.getSlots()) {
+      boolean foundMatch = false;
+      for (SlotDescriptor insertSlotDesc : insertTupleDesc.getSlots()) {
+        if (deleteSlotDesc.getMaterializedPath().equals(
+            insertSlotDesc.getMaterializedPath())) {
+          foundMatch = true;
+          BinaryPredicate pred = new BinaryPredicate(
+              Operator.EQ, new SlotRef(insertSlotDesc), new SlotRef(deleteSlotDesc));
+          pred.analyze(analyzer);
+          ret.add(pred);
+          break;
+        }
+      }
+      Preconditions.checkState(foundMatch);
+    }
+    return ret;
+  }
+
+  /**
    * Looks for a filesystem-based partition in 'partitions' with no DATE support and
    * returns the first one it finds. Right now, scanning DATE values is supported for
    * TEXT, PARQUET, AVRO and ORC fileformats.
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 789c515..94cb9e2 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -366,6 +366,13 @@ public class AcidUtils {
   }
 
   /**
+   * Returns true if 'fd' refers to a delete delta file.
+   */
+  public static boolean isDeleteDeltaFd(FileDescriptor fd) {
+    return fd.getRelativePath().startsWith("delete_delta_");
+  }
+
+  /**
    * This method is similar to {@link AcidUtils#filterFilesForAcidState} with the
    * difference that it expects input to be valid file descriptors from a loaded table.
    * This means that file descriptors are already pre-vetted and are consistent with
@@ -503,6 +510,7 @@ public class AcidUtils {
     for (Iterator<String> it = deltaDirNames.iterator(); it.hasNext();) {
       String dirname = it.next();
       ParsedDelta parsedDelta = parseDelta(dirname);
+      if (parsedDelta == null) parsedDelta = parseDeleteDelta(dirname);
       if (parsedDelta != null) {
         if (parsedDelta.minWriteId <= baseWriteId) {
           Preconditions.checkState(parsedDelta.maxWriteId <= baseWriteId);
@@ -512,13 +520,6 @@ public class AcidUtils {
         deltas.add(new Pair<String, ParsedDelta>(dirname, parsedDelta));
         continue;
       }
-      ParsedDelta deleteDelta = parseDeleteDelta(dirname);
-      if (deleteDelta != null) {
-        if (deleteDelta.maxWriteId > baseWriteId) {
-          throw new CatalogException("Table has deleted rows. It's currently not "
-              + "supported by Impala. Run major compaction to resolve this.");
-        }
-      }
     }
 
     deltas.sort(new Comparator<Pair<String, ParsedDelta>>() {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 7052721..587268b 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1050,4 +1050,12 @@ public class PlannerTest extends PlannerTestBase {
   public void testConvertToCNF() {
     runPlannerTestFile("convert-to-cnf", "tpch_parquet");
   }
+
+  /**
+   * Check that ACID table scans work as expected.
+   */
+  @Test
+  public void testAcidTableScans() {
+    runPlannerTestFile("acid-scans", "functional_orc_def");
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index d4e9748..84b0131 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -494,8 +494,8 @@ public class AcidUtilsTest {
   }
 
   @Test
-  public void testDeleteDeltaFail() {
-    filteringError(new String[]{
+  public void testDeleteDelta() {
+    assertFiltering(new String[]{
             "base_0000005/",
             "base_0000005/abc.txt",
             "delete_delta_0000006_0000006/",
@@ -504,8 +504,9 @@ public class AcidUtilsTest {
         "",
         // <tbl>:<hwm>:<minOpenWriteId>:<openWriteIds>:<abortedWriteIds>
         "default.test:10:1234:1,2,3",
-        "Table has deleted rows"
-        );
+        new String[]{
+          "base_0000005/abc.txt",
+          "delete_delta_0000006_0000006/00000"});
   }
 
   public void testHiveStreamingFail() {
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 807267f..fb7a0b4 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -527,6 +527,31 @@ TBLPROPERTIES("hbase.table.name" = "functional_hbase.hbasealltypeserrornonulls")
 ---- DATASET
 functional
 ---- BASE_TABLE_NAME
+alltypes_deleted_rows
+---- PARTITION_COLUMNS
+year int
+month int
+---- COLUMNS
+id int COMMENT 'Add a comment'
+bool_col boolean
+tinyint_col tinyint
+smallint_col smallint
+int_col int
+bigint_col bigint
+float_col float
+double_col double
+date_string_col string
+string_col string
+timestamp_col timestamp
+---- DEPENDENT_LOAD_ACID
+INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.alltypes;
+DELETE FROM {db_name}{db_suffix}.{table_name} WHERE month % 2 = 0 and year % 2 = 0 and id % 10 = 0;
+---- TABLE_PROPERTIES
+transactional=true
+====
+---- DATASET
+functional
+---- BASE_TABLE_NAME
 alltypesagg
 ---- PARTITION_COLUMNS
 year int
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 7d5b3d0..b8a8115 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -12,6 +12,7 @@ table_name:hbasealltypeserrornonulls, constraint:restrict_to, table_format:hbase
 
 table_name:alltypesinsert, constraint:restrict_to, table_format:text/none/none
 table_name:alltypes_promoted, constraint:restrict_to, table_format:orc/def/block
+table_name:alltypes_deleted_rows, constraint:restrict_to, table_format:orc/def/block
 table_name:stringpartitionkey, constraint:restrict_to, table_format:text/none/none
 table_name:alltypesnopart_insert, constraint:restrict_to, table_format:text/none/none
 table_name:insert_overwrite_nopart, constraint:restrict_to, table_format:text/none/none
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
new file mode 100644
index 0000000..171c894
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/acid-scans.test
@@ -0,0 +1,829 @@
+# Select from a full ACID table that has deleted rows. The plan should
+# contain an DELETE EVENTS HASH JOIN between the insert deltas and delete deltas.
+select * from alltypes_deleted_rows;
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=100B cardinality=3.44K
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      row-size=100B cardinality=3.44K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=100B cardinality=3.44K
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      row-size=100B cardinality=3.44K
+====
+# Select from a partition that has delete delta files.
+select * from alltypes_deleted_rows where month = 2 and year = 2010;
+---- PLAN
+PLAN-ROOT SINK
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=100B cardinality=143
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|     partition predicates: `month` = 2, `year` = 2010
+|     HDFS partitions=1/24 files=1 size=1.10KB
+|     row-size=28B cardinality=70
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   partition predicates: `month` = 2, `year` = 2010
+   HDFS partitions=1/24 files=1 size=2.24KB
+      row-size=100B cardinality=143
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+04:EXCHANGE [UNPARTITIONED]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=100B cardinality=143
+|
+|--03:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|     partition predicates: `month` = 2, `year` = 2010
+|     HDFS partitions=1/24 files=1 size=1.10KB
+|     row-size=28B cardinality=70
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   partition predicates: `month` = 2, `year` = 2010
+   HDFS partitions=1/24 files=1 size=2.24KB
+      row-size=100B cardinality=143
+====
+# Select from a partition that don't have delete delta files. It should
+# just do a simple SCAN.
+select * from alltypes_deleted_rows where month = 2 and year = 2009;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   partition predicates: `month` = 2, `year` = 2009
+   HDFS partitions=1/24 files=1 size=2.25KB
+      row-size=80B cardinality=143
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+   partition predicates: `month` = 2, `year` = 2009
+   HDFS partitions=1/24 files=1 size=2.25KB
+      row-size=80B cardinality=143
+====
+# Use IN predicate to join the table with itself.
+select *
+from alltypes_deleted_rows dd
+where id in (select max(id) from alltypes_deleted_rows group by month);
+---- PLAN
+PLAN-ROOT SINK
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = max(id)
+|  runtime filters: RF000 <- max(id)
+|  row-size=100B cardinality=3.44K
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: max(id)
+|  |  group by: `month`
+|  |  row-size=8B cardinality=12
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=100B cardinality=3.44K
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> id
+         row-size=100B cardinality=3.44K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+13:EXCHANGE [UNPARTITIONED]
+|
+07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = max(id)
+|  runtime filters: RF000 <- max(id)
+|  row-size=100B cardinality=3.44K
+|
+|--12:EXCHANGE [BROADCAST]
+|  |
+|  11:AGGREGATE [FINALIZE]
+|  |  output: max:merge(id)
+|  |  group by: `month`
+|  |  row-size=8B cardinality=12
+|  |
+|  10:EXCHANGE [HASH(`month`)]
+|  |
+|  06:AGGREGATE [STREAMING]
+|  |  output: max(id)
+|  |  group by: `month`
+|  |  row-size=8B cardinality=12
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--09:EXCHANGE [BROADCAST]
+|  |  |
+|  |  04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=100B cardinality=3.44K
+|
+|--08:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> id
+         row-size=100B cardinality=3.44K
+====
+# Do an explicit join with itself. This creates a bushy plan.
+select t1.id, t2.month
+from alltypes_deleted_rows t1, alltypes_deleted_rows t2
+where t1.id % 12 = t2.month;
+---- PLAN
+PLAN-ROOT SINK
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id % 12 = t2.`month`
+|  runtime filters: RF000 <- t2.`month`
+|  row-size=60B cardinality=3.44K
+|
+|--05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  row-size=28B cardinality=3.44K
+|  |
+|  |--04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=28B cardinality=3.44K
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=32B cardinality=3.44K
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> t1.id % 12
+         row-size=32B cardinality=3.44K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+11:EXCHANGE [UNPARTITIONED]
+|
+06:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: t1.id % 12 = t2.`month`
+|  runtime filters: RF000 <- t2.`month`
+|  row-size=60B cardinality=3.44K
+|
+|--10:EXCHANGE [HASH(t2.`month`)]
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  row-size=28B cardinality=3.44K
+|  |
+|  |--08:EXCHANGE [BROADCAST]
+|  |  |
+|  |  04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=28B cardinality=3.44K
+|
+09:EXCHANGE [HASH(t1.id % 12)]
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=32B cardinality=3.44K
+|
+|--07:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> t1.id % 12
+         row-size=32B cardinality=3.44K
+====
+# Use IN predicate to join the table with itself.
+# Use explain_level=2.
+select *
+from alltypes_deleted_rows dd
+where id in (select max(id) from alltypes_deleted_rows group by month);
+---- QUERYOPTIONS
+explain_level=2
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=228.88MB mem-reservation=6.98MB thread-reservation=5 runtime-filters-memory=1.00MB
+PLAN-ROOT SINK
+|  output exprs: dd.id, dd.bool_col, dd.tinyint_col, dd.smallint_col, dd.int_col, dd.bigint_col, dd.float_col, dd.double_col, dd.date_string_col, dd.string_col, dd.timestamp_col, dd.year, dd.month
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+07:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: id = max(id)
+|  runtime filters: RF000[bloom] <- max(id)
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=3.44K
+|  in pipelines: 00(GETNEXT), 06(OPEN)
+|
+|--06:AGGREGATE [FINALIZE]
+|  |  output: max(id)
+|  |  group by: `month`
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=12
+|  |  in pipelines: 06(GETNEXT), 03(OPEN)
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  hash predicates: functional_orc_def.alltypes_deleted_rows.month = functional_orc_def.alltypes_deleted_rows-delete-delta.month, functional_orc_def.alltypes_deleted_rows.row__id.bucket = functional_orc_def.alltypes_deleted_rows-delete-delta.row__id.bucket, functional_orc_def.alltypes_deleted_rows.row__id.originaltransaction = functional_orc_def.alltypes_deleted_rows-delete-delta.row__id.originaltransaction, functional_orc_def.alltypes_deleted_rows.row__id.rowid = functional_orc_def.a [...]
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=1 row-size=32B cardinality=3.44K
+|  |  in pipelines: 03(GETNEXT), 04(OPEN)
+|  |
+|  |--04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     stored statistics:
+|  |       table: rows=unavailable size=unavailable
+|  |       partitions: 0/6 rows=unavailable
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+|  |     tuple-ids=5 row-size=28B cardinality=419
+|  |     in pipelines: 04(GETNEXT)
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns missing stats: id
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
+|     tuple-ids=1 row-size=32B cardinality=3.44K
+|     in pipelines: 03(GETNEXT)
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  hash predicates: dd.month = dd-delete-delta.month, dd.row__id.bucket = dd-delete-delta.row__id.bucket, dd.row__id.originaltransaction = dd-delete-delta.row__id.originaltransaction, dd.row__id.rowid = dd-delete-delta.row__id.rowid, dd.year = dd-delete-delta.year
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=3.44K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/6 rows=unavailable
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+|     tuple-ids=4 row-size=28B cardinality=419
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd]
+   HDFS partitions=24/24 files=24 size=54.09KB
+   runtime filters: RF000[bloom] -> id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
+   tuple-ids=0 row-size=100B cardinality=3.44K
+   in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=424.01KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: dd.id, dd.bool_col, dd.tinyint_col, dd.smallint_col, dd.int_col, dd.bigint_col, dd.float_col, dd.double_col, dd.date_string_col, dd.string_col, dd.timestamp_col, dd.year, dd.month
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+13:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=424.01KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=3.44K
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=68.94MB mem-reservation=4.98MB thread-reservation=2 runtime-filters-memory=1.00MB
+07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
+|  hash predicates: id = max(id)
+|  runtime filters: RF000[bloom] <- max(id)
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=3.44K
+|  in pipelines: 00(GETNEXT), 11(OPEN)
+|
+|--12:EXCHANGE [BROADCAST]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=12
+|  |  in pipelines: 11(GETNEXT)
+|  |
+|  F04:PLAN FRAGMENT [HASH(`month`)] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=10.02MB mem-reservation=1.94MB thread-reservation=1
+|  11:AGGREGATE [FINALIZE]
+|  |  output: max:merge(id)
+|  |  group by: `month`
+|  |  mem-estimate=10.00MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=12
+|  |  in pipelines: 11(GETNEXT), 03(OPEN)
+|  |
+|  10:EXCHANGE [HASH(`month`)]
+|  |  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=12
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=75.99MB mem-reservation=3.97MB thread-reservation=2
+|  06:AGGREGATE [STREAMING]
+|  |  output: max(id)
+|  |  group by: `month`
+|  |  mem-estimate=10.00MB mem-reservation=2.00MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=2 row-size=8B cardinality=12
+|  |  in pipelines: 03(GETNEXT)
+|  |
+|  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  hash predicates: functional_orc_def.alltypes_deleted_rows.month = functional_orc_def.alltypes_deleted_rows-delete-delta.month, functional_orc_def.alltypes_deleted_rows.row__id.bucket = functional_orc_def.alltypes_deleted_rows-delete-delta.row__id.bucket, functional_orc_def.alltypes_deleted_rows.row__id.originaltransaction = functional_orc_def.alltypes_deleted_rows-delete-delta.row__id.originaltransaction, functional_orc_def.alltypes_deleted_rows.row__id.rowid = functional_orc_def.a [...]
+|  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  |  tuple-ids=1 row-size=32B cardinality=3.44K
+|  |  in pipelines: 03(GETNEXT), 04(OPEN)
+|  |
+|  |--09:EXCHANGE [BROADCAST]
+|  |  |  mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
+|  |  |  tuple-ids=5 row-size=28B cardinality=419
+|  |  |  in pipelines: 04(GETNEXT)
+|  |  |
+|  |  F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  |  Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=2
+|  |  04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows functional_orc_def.alltypes_deleted_rows-delete-delta, RANDOM]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     stored statistics:
+|  |       table: rows=unavailable size=unavailable
+|  |       partitions: 0/6 rows=unavailable
+|  |       columns: all
+|  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+|  |     tuple-ids=5 row-size=28B cardinality=419
+|  |     in pipelines: 04(GETNEXT)
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows, RANDOM]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/24 rows=unavailable
+|       columns missing stats: id
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
+|     tuple-ids=1 row-size=32B cardinality=3.44K
+|     in pipelines: 03(GETNEXT)
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  hash predicates: dd.month = dd-delete-delta.month, dd.row__id.bucket = dd-delete-delta.row__id.bucket, dd.row__id.originaltransaction = dd-delete-delta.row__id.originaltransaction, dd.row__id.rowid = dd-delete-delta.row__id.rowid, dd.year = dd-delete-delta.year
+|  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+|  tuple-ids=0 row-size=100B cardinality=3.44K
+|  in pipelines: 00(GETNEXT), 01(OPEN)
+|
+|--08:EXCHANGE [BROADCAST]
+|  |  mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
+|  |  tuple-ids=4 row-size=28B cardinality=419
+|  |  in pipelines: 01(GETNEXT)
+|  |
+|  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+|  Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=2
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd-delete-delta, RANDOM]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     stored statistics:
+|       table: rows=unavailable size=unavailable
+|       partitions: 0/6 rows=unavailable
+|       columns: all
+|     extrapolated-rows=disabled max-scan-range-rows=unavailable
+|     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+|     tuple-ids=4 row-size=28B cardinality=419
+|     in pipelines: 01(GETNEXT)
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows dd, RANDOM]
+   HDFS partitions=24/24 files=24 size=54.09KB
+   runtime filters: RF000[bloom] -> id
+   stored statistics:
+     table: rows=unavailable size=unavailable
+     partitions: 0/24 rows=unavailable
+     columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
+   extrapolated-rows=disabled max-scan-range-rows=unavailable
+   mem-estimate=64.00MB mem-reservation=112.00KB thread-reservation=1
+   tuple-ids=0 row-size=100B cardinality=3.44K
+   in pipelines: 00(GETNEXT)
+====
+# Do an explicit join with itself. This creates a bushy plan.
+# Use explain_level=3.
+select t1.id, t2.month
+from alltypes_deleted_rows t1, alltypes_deleted_rows t2
+where t1.id % 12 = t2.month;
+---- QUERYOPTIONS
+explain_level=3
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=212.88MB mem-reservation=6.89MB thread-reservation=5 runtime-filters-memory=1.00MB
+  PLAN-ROOT SINK
+  |  output exprs: t1.id, t2.`month`
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  |
+  06:HASH JOIN [INNER JOIN]
+  |  hash predicates: t1.id % 12 = t2.`month`
+  |  fk/pk conjuncts: assumed fk/pk
+  |  runtime filters: RF000[bloom] <- t2.`month`
+  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  tuple-ids=0,1 row-size=60B cardinality=3.44K
+  |  in pipelines: 00(GETNEXT), 03(OPEN)
+  |
+  |--05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+  |  |  hash predicates: t2.month = t2-delete-delta.month, t2.row__id.bucket = t2-delete-delta.row__id.bucket, t2.row__id.originaltransaction = t2-delete-delta.row__id.originaltransaction, t2.row__id.rowid = t2-delete-delta.row__id.rowid, t2.year = t2-delete-delta.year
+  |  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  |  tuple-ids=1 row-size=28B cardinality=3.44K
+  |  |  in pipelines: 03(GETNEXT), 04(OPEN)
+  |  |
+  |  |--04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2-delete-delta]
+  |  |     HDFS partitions=6/24 files=6 size=6.58KB
+  |  |     stored statistics:
+  |  |       table: rows=unavailable size=unavailable
+  |  |       partitions: 0/6 rows=unavailable
+  |  |       columns: all
+  |  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
+  |  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+  |  |     tuple-ids=3 row-size=28B cardinality=419
+  |  |     in pipelines: 04(GETNEXT)
+  |  |
+  |  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
+  |     HDFS partitions=24/24 files=24 size=54.09KB
+  |     stored statistics:
+  |       table: rows=unavailable size=unavailable
+  |       partitions: 0/24 rows=unavailable
+  |       columns: all
+  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
+  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+  |     tuple-ids=1 row-size=28B cardinality=3.44K
+  |     in pipelines: 03(GETNEXT)
+  |
+  02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+  |  hash predicates: t1.month = t1-delete-delta.month, t1.row__id.bucket = t1-delete-delta.row__id.bucket, t1.row__id.originaltransaction = t1-delete-delta.row__id.originaltransaction, t1.row__id.rowid = t1-delete-delta.row__id.rowid, t1.year = t1-delete-delta.year
+  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  tuple-ids=0 row-size=32B cardinality=3.44K
+  |  in pipelines: 00(GETNEXT), 01(OPEN)
+  |
+  |--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta]
+  |     HDFS partitions=6/24 files=6 size=6.58KB
+  |     stored statistics:
+  |       table: rows=unavailable size=unavailable
+  |       partitions: 0/6 rows=unavailable
+  |       columns: all
+  |     extrapolated-rows=disabled max-scan-range-rows=unavailable
+  |     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+  |     tuple-ids=2 row-size=28B cardinality=419
+  |     in pipelines: 01(GETNEXT)
+  |
+  00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1]
+     HDFS partitions=24/24 files=24 size=54.09KB
+     runtime filters: RF000[bloom] -> t1.id % 12
+     stored statistics:
+       table: rows=unavailable size=unavailable
+       partitions: 0/24 rows=unavailable
+       columns missing stats: id
+     extrapolated-rows=disabled max-scan-range-rows=unavailable
+     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
+     tuple-ids=0 row-size=32B cardinality=3.44K
+     in pipelines: 00(GETNEXT)
+---- DISTRIBUTEDPLAN
+F05:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+Per-Host Resources: mem-estimate=271.21KB mem-reservation=0B thread-reservation=1
+  PLAN-ROOT SINK
+  |  output exprs: t1.id, t2.`month`
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  |
+  11:EXCHANGE [UNPARTITIONED]
+     mem-estimate=271.21KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0,1 row-size=60B cardinality=3.44K
+     in pipelines: 00(GETNEXT)
+
+F04:PLAN FRAGMENT [HASH(t1.id % 12)] hosts=3 instances=3
+Per-Host Resources: mem-estimate=3.20MB mem-reservation=2.94MB thread-reservation=1 runtime-filters-memory=1.00MB
+  DATASTREAM SINK [FRAGMENT=F05, EXCHANGE=11, UNPARTITIONED]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  06:HASH JOIN [INNER JOIN, PARTITIONED]
+  |  hash predicates: t1.id % 12 = t2.`month`
+  |  fk/pk conjuncts: assumed fk/pk
+  |  runtime filters: RF000[bloom] <- t2.`month`
+  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  tuple-ids=0,1 row-size=60B cardinality=3.44K
+  |  in pipelines: 00(GETNEXT), 03(OPEN)
+  |
+  |--10:EXCHANGE [HASH(t2.`month`)]
+  |     mem-estimate=127.36KB mem-reservation=0B thread-reservation=0
+  |     tuple-ids=1 row-size=28B cardinality=3.44K
+  |     in pipelines: 03(GETNEXT)
+  |
+  09:EXCHANGE [HASH(t1.id % 12)]
+     mem-estimate=143.84KB mem-reservation=0B thread-reservation=0
+     tuple-ids=0 row-size=32B cardinality=3.44K
+     in pipelines: 00(GETNEXT)
+
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=66.99MB mem-reservation=2.97MB thread-reservation=2 runtime-filters-memory=1.00MB
+  DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=09, HASH(t1.id % 12)]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+  |  hash predicates: t1.month = t1-delete-delta.month, t1.row__id.bucket = t1-delete-delta.row__id.bucket, t1.row__id.originaltransaction = t1-delete-delta.row__id.originaltransaction, t1.row__id.rowid = t1-delete-delta.row__id.rowid, t1.year = t1-delete-delta.year
+  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  tuple-ids=0 row-size=32B cardinality=3.44K
+  |  in pipelines: 00(GETNEXT), 01(OPEN)
+  |
+  |--07:EXCHANGE [BROADCAST]
+  |     mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
+  |     tuple-ids=2 row-size=28B cardinality=419
+  |     in pipelines: 01(GETNEXT)
+  |
+  00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1, RANDOM]
+     HDFS partitions=24/24 files=24 size=54.09KB
+     runtime filters: RF000[bloom] -> t1.id % 12
+     stored statistics:
+       table: rows=unavailable size=unavailable
+       partitions: 0/24 rows=unavailable
+       columns missing stats: id
+     extrapolated-rows=disabled max-scan-range-rows=unavailable
+     mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1
+     tuple-ids=0 row-size=32B cardinality=3.44K
+     in pipelines: 00(GETNEXT)
+
+F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F00, EXCHANGE=07, BROADCAST]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta, RANDOM]
+     HDFS partitions=6/24 files=6 size=6.58KB
+     stored statistics:
+       table: rows=unavailable size=unavailable
+       partitions: 0/6 rows=unavailable
+       columns: all
+     extrapolated-rows=disabled max-scan-range-rows=unavailable
+     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+     tuple-ids=2 row-size=28B cardinality=419
+     in pipelines: 01(GETNEXT)
+
+F02:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=49.99MB mem-reservation=1.96MB thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F04, EXCHANGE=10, HASH(t2.`month`)]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  05:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+  |  hash predicates: t2.month = t2-delete-delta.month, t2.row__id.bucket = t2-delete-delta.row__id.bucket, t2.row__id.originaltransaction = t2-delete-delta.row__id.originaltransaction, t2.row__id.rowid = t2-delete-delta.row__id.rowid, t2.year = t2-delete-delta.year
+  |  mem-estimate=1.94MB mem-reservation=1.94MB spill-buffer=64.00KB thread-reservation=0
+  |  tuple-ids=1 row-size=28B cardinality=3.44K
+  |  in pipelines: 03(GETNEXT), 04(OPEN)
+  |
+  |--08:EXCHANGE [BROADCAST]
+  |     mem-estimate=50.74KB mem-reservation=0B thread-reservation=0
+  |     tuple-ids=3 row-size=28B cardinality=419
+  |     in pipelines: 04(GETNEXT)
+  |
+  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2, RANDOM]
+     HDFS partitions=24/24 files=24 size=54.09KB
+     stored statistics:
+       table: rows=unavailable size=unavailable
+       partitions: 0/24 rows=unavailable
+       columns: all
+     extrapolated-rows=disabled max-scan-range-rows=unavailable
+     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+     tuple-ids=1 row-size=28B cardinality=3.44K
+     in pipelines: 03(GETNEXT)
+
+F03:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=2
+  DATASTREAM SINK [FRAGMENT=F02, EXCHANGE=08, BROADCAST]
+  |  mem-estimate=0B mem-reservation=0B thread-reservation=0
+  04:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2-delete-delta, RANDOM]
+     HDFS partitions=6/24 files=6 size=6.58KB
+     stored statistics:
+       table: rows=unavailable size=unavailable
+       partitions: 0/6 rows=unavailable
+       columns: all
+     extrapolated-rows=disabled max-scan-range-rows=unavailable
+     mem-estimate=48.00MB mem-reservation=24.00KB thread-reservation=1
+     tuple-ids=3 row-size=28B cardinality=419
+     in pipelines: 04(GETNEXT)
+====
+# Do a join with itself, but the scanned partitions of 't2' don't have delete delta files.
+select t1.id, t2.month
+from alltypes_deleted_rows t1, alltypes_deleted_rows t2
+where t1.id % 12 = t2.month and t2.year = 2009;
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: t1.id % 12 = t2.`month`
+|  runtime filters: RF000 <- t2.`month`
+|  row-size=36B cardinality=3.44K
+|
+|--03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
+|     partition predicates: t2.`year` = 2009
+|     HDFS partitions=12/24 files=12 size=27.02KB
+|     row-size=4B cardinality=1.72K
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  row-size=32B cardinality=3.44K
+|
+|--01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> t1.id % 12
+         row-size=32B cardinality=3.44K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: t1.id % 12 = t2.`month`
+|  runtime filters: RF000 <- t2.`month`
+|  row-size=36B cardinality=3.44K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  03:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t2]
+|     partition predicates: t2.`year` = 2009
+|     HDFS partitions=12/24 files=12 size=27.02KB
+|     row-size=4B cardinality=1.72K
+|
+02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  row-size=32B cardinality=3.44K
+|
+|--05:EXCHANGE [BROADCAST]
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1-delete-delta]
+|     HDFS partitions=6/24 files=6 size=6.58KB
+|     row-size=28B cardinality=419
+|
+00:SCAN HDFS [functional_orc_def.alltypes_deleted_rows t1]
+   HDFS partitions=24/24 files=24 size=54.09KB
+      runtime filters: RF000 -> t1.id % 12
+         row-size=32B cardinality=3.44K
+====
+# Query hints + using
+select straight_join a.id from functional.alltypes a
+  join /* +BROADCAST */ functional_orc_def.alltypes_deleted_rows b
+  using (id);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=36B cardinality=6.12K
+|
+|--03:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--02:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+00:SCAN HDFS [functional.alltypes a]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=6.12K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+07:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, BROADCAST]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=36B cardinality=6.12K
+|
+|--06:EXCHANGE [BROADCAST]
+|  |
+|  03:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--05:EXCHANGE [BROADCAST]
+|  |  |
+|  |  02:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+00:SCAN HDFS [functional.alltypes a]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=6.12K
+====
+select straight_join a.id from functional.alltypes a
+ join /* +SHUFFLE */ functional_orc_def.alltypes_deleted_rows b
+ using (id);
+---- PLAN
+PLAN-ROOT SINK
+|
+04:HASH JOIN [INNER JOIN]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=36B cardinality=6.12K
+|
+|--03:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--02:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+00:SCAN HDFS [functional.alltypes a]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=6.12K
+---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
+08:EXCHANGE [UNPARTITIONED]
+|
+04:HASH JOIN [INNER JOIN, PARTITIONED]
+|  hash predicates: a.id = b.id
+|  runtime filters: RF000 <- b.id
+|  row-size=36B cardinality=6.12K
+|
+|--07:EXCHANGE [HASH(b.id)]
+|  |
+|  03:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST]
+|  |  row-size=32B cardinality=3.44K
+|  |
+|  |--05:EXCHANGE [BROADCAST]
+|  |  |
+|  |  02:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b-delete-delta]
+|  |     HDFS partitions=6/24 files=6 size=6.58KB
+|  |     row-size=28B cardinality=419
+|  |
+|  01:SCAN HDFS [functional_orc_def.alltypes_deleted_rows b]
+|     HDFS partitions=24/24 files=24 size=54.09KB
+|     row-size=32B cardinality=3.44K
+|
+06:EXCHANGE [HASH(a.id)]
+|
+00:SCAN HDFS [functional.alltypes a]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> a.id
+   row-size=4B cardinality=6.12K
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
index 9cb7d32..59c16df 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test
@@ -20,88 +20,48 @@ truncate table functional_orc_def.full_transactional_table;
 AnalysisException: TRUNCATE not supported on full transactional (ACID) table: functional_orc_def.full_transactional_table
 ====
 ---- QUERY
-create table acid (i int) stored as orc tblproperties('transactional'='true');
+# Impala should reject tables that have multiple files in the same
+# bucket in the same directory.
+# Note: This table is clearly not bucketed, but for row ID
+# generation it has virtual buckets based on the file names.
+create table test_promotion_fail (i int) stored as orc;
 ====
 ---- HIVE_QUERY
 use $DATABASE;
-insert into acid values (1), (2), (3);
-delete from acid where i = 2;
+insert into test_promotion_fail values (1);
+insert into test_promotion_fail values (1);
+alter table test_promotion_fail
+set tblproperties('EXTERNAL'='false','transactional'='true');
 ====
 ---- QUERY
-refresh acid;
-select * from acid;
+refresh test_promotion_fail;
+select * from  test_promotion_fail;
 ---- CATCH
-TableLoadingException
-====
----- HIVE_QUERY
-alter table $DATABASE.acid compact 'major' and wait;
+Found original file with unexpected name
 ====
 ---- QUERY
-invalidate metadata acid;
-select * from acid;
----- RESULTS
-1
-3
----- TYPES
-INT
+create table complex_copy like functional_orc_def.complextypestbl;
 ====
 ---- HIVE_QUERY
 use $DATABASE;
-insert into acid values (5);
-insert into acid values (5);
-insert into acid values (5);
-====
----- QUERY
-refresh acid;
-select * from acid;
----- RESULTS
-1
-3
-5
-5
-5
----- TYPES
-INT
-====
----- HIVE_QUERY
-alter table $DATABASE.acid compact 'major' and wait;
-====
----- QUERY
-refresh acid;
-show files in acid;
----- RESULTS
-row_regex:'$NAMENODE/$MANAGED_WAREHOUSE_DIR/$DATABASE.db/acid/base_0000005_v\d+/bucket_\d+','\d+K?B',''
----- TYPES
-STRING,STRING,STRING
+insert into complex_copy select * from functional_orc_def.complextypestbl;
+delete from complex_copy where id % 2 = 0;
 ====
 ---- QUERY
-select * from acid;
+select id from complex_copy;
 ---- RESULTS
 1
 3
 5
-5
-5
----- TYPES
-INT
+7
 ====
 ---- QUERY
-# Impala should reject tables that have multiple files in the same
-# bucket in the same directory.
-# Note: This table is clearly not bucketed, but for row ID
-# generation it has virtual buckets based on the file names.
-create table test_promotion_fail (i int) stored as orc;
-====
----- HIVE_QUERY
-use $DATABASE;
-insert into test_promotion_fail values (1);
-insert into test_promotion_fail values (1);
-alter table test_promotion_fail
-set tblproperties('EXTERNAL'='false','transactional'='true');
+select id, item from complex_copy c, c.int_array;
+---- CATCH
+This query is not supported on full ACID tables
 ====
 ---- QUERY
-refresh test_promotion_fail;
-select * from  test_promotion_fail;
+select item from complex_copy.int_array;
 ---- CATCH
-Found original file with unexpected name
+This query is not supported on full ACID tables
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-scans.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-scans.test
new file mode 100644
index 0000000..fb2c34b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-scans.test
@@ -0,0 +1,195 @@
+====
+---- QUERY
+create table acid (i int) stored as orc tblproperties('transactional'='true');
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into acid values (1), (2), (3);
+delete from acid where i = 2;
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- RESULTS
+1
+3
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into acid values (5), (5), (5);
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- RESULTS
+1
+3
+5
+5
+5
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+update acid set i = i + 1;
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- RESULTS
+2
+4
+6
+6
+6
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert overwrite table acid select 1000;
+====
+---- QUERY
+refresh acid;
+select * from acid;
+---- RESULTS
+1000
+---- TYPES
+INT
+====
+---- QUERY
+select count(*) from functional_orc_def.alltypes_deleted_rows;
+---- RESULTS
+7119
+---- TYPES
+BIGINT
+====
+---- QUERY
+select count(*)
+from functional_orc_def.alltypes_deleted_rows
+where month % 2 = 0 and year % 2 = 0 and id % 10 = 0;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+select year, month, count(*)
+from functional_orc_def.alltypes_deleted_rows
+group by year, month
+order by year, month
+---- RESULTS
+2009,1,310
+2009,2,280
+2009,3,310
+2009,4,300
+2009,5,310
+2009,6,300
+2009,7,310
+2009,8,310
+2009,9,300
+2009,10,310
+2009,11,300
+2009,12,310
+2010,1,310
+2010,2,252
+2010,3,310
+2010,4,270
+2010,5,310
+2010,6,270
+2010,7,310
+2010,8,279
+2010,9,300
+2010,10,279
+2010,11,300
+2010,12,279
+---- TYPES
+INT,INT,BIGINT
+====
+---- QUERY
+select count(*) from (
+  select * from functional_orc_def.alltypes_deleted_rows where id % 2 = 0
+  union all
+  select * from functional_orc_def.alltypes_deleted_rows where id % 2 != 0
+) t;
+---- RESULTS
+7119
+---- TYPES
+BIGINT
+====
+---- QUERY
+select id from functional_orc_def.alltypes_deleted_rows
+where id % 2 = 0 order by id desc limit 10
+union all
+select max(id) from functional_orc_def.alltypes_deleted_rows;
+---- RESULTS
+7298
+7296
+7294
+7292
+7288
+7286
+7284
+7282
+7278
+7276
+7299
+---- TYPES
+INT
+====
+---- QUERY
+select id from functional_orc_def.alltypes_deleted_rows
+where year=2010 and month = 8 and id % 2 = 0 order by id desc limit 10
+union all
+select max(id) from functional_orc_def.alltypes_deleted_rows;
+---- RESULTS
+6078
+6076
+6074
+6072
+6068
+6066
+6064
+6062
+6058
+6056
+7299
+---- TYPES
+INT
+====
+---- QUERY
+create table acid_part_key_scan (id int)
+partitioned by (p int) stored as orc
+tblproperties('transactional'='true');
+====
+---- HIVE_QUERY
+use $DATABASE;
+insert into acid_part_key_scan partition(p=0) values (0), (1), (2);
+insert into acid_part_key_scan partition(p=1) values (0), (1), (2);
+delete from acid_part_key_scan where p = 1 and id = 0;
+====
+---- QUERY
+refresh acid_part_key_scan;
+select max(p) from acid_part_key_scan;
+---- RESULTS
+1
+---- TYPES
+INT
+====
+---- HIVE_QUERY
+use $DATABASE;
+delete from acid_part_key_scan where p = 1;
+====
+---- QUERY
+refresh acid_part_key_scan;
+show partitions acid_part_key_scan;
+---- RESULTS
+'0',-1,1,regex:.+,regex:.+,regex:.+,regex:.+,regex:.+,regex:.*
+'1',-1,3,regex:.+,regex:.+,regex:.+,regex:.+,regex:.+,regex:.*
+'Total',-1,4,regex:.+,regex:.+,'','','',''
+---- TYPES
+STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING
+====
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index 4e1caec..e0eed9f 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -27,7 +27,8 @@ import time
 from multiprocessing.pool import ThreadPool
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
-from tests.common.skip import SkipIfHive2
+from tests.common.skip import (SkipIfHive2, SkipIfS3, SkipIfABFS,
+                               SkipIfADLS, SkipIfIsilon, SkipIfLocal)
 from tests.util.filesystem_utils import WAREHOUSE
 
 RETRY_PROFILE_MSG = 'Retried query planning due to inconsistent metadata'
@@ -459,6 +460,10 @@ class TestObservability(CustomClusterTestSuite):
 
 
 class TestFullAcid(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
   @SkipIfHive2.acid
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -471,6 +476,15 @@ class TestFullAcid(CustomClusterTestSuite):
     res.data.sort()
     assert res.data == ['0', '1', '2', '3', '4', '5', '6', '7']
 
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  @pytest.mark.execute_serially
+  def test_full_acid_scans(self, vector, unique_database):
+    self.run_test_case('QueryTest/full-acid-scans', vector, use_db=unique_database)
 
 class TestReusePartitionMetadata(CustomClusterTestSuite):
   @pytest.mark.execute_serially
diff --git a/tests/query_test/test_acid.py b/tests/query_test/test_acid.py
index f851bb7..307db54 100644
--- a/tests/query_test/test_acid.py
+++ b/tests/query_test/test_acid.py
@@ -98,6 +98,15 @@ class TestAcid(ImpalaTestSuite):
   def test_acid_partitioned(self, vector, unique_database):
     self.run_test_case('QueryTest/acid-partitioned', vector, use_db=unique_database)
 
+  @SkipIfHive2.acid
+  @SkipIfS3.hive
+  @SkipIfABFS.hive
+  @SkipIfADLS.hive
+  @SkipIfIsilon.hive
+  @SkipIfLocal.hive
+  def test_full_acid_scans(self, vector, unique_database):
+    self.run_test_case('QueryTest/full-acid-scans', vector, use_db=unique_database)
+
   # When local CatalogV2 combines with hms_enent_polling enabled, it seems
   # that Catalog loads tables by itself, the query statement cannot trigger
   # loading tables. As the ValidWriteIdlists is part of table loading profile,