You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2020/06/13 00:05:46 UTC

[impala] 01/03: IMPALA-9791: Support validWriteIdList in getPartialCatalogObject API

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0cb44242d20532945e5fb09f5bbef6c65415a753
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Fri May 22 14:56:43 2020 -0700

    IMPALA-9791: Support validWriteIdList in getPartialCatalogObject API
    
    This change enhances the Catalog-v2 API getPartialCatalogObject to
    support ValidWriteIdList as an optional field in the TableInfoSelector.
    When such a field is provided by the clients, catalog compares the
    provided ValidWriteIdList with the cached ValidWriteIdList of the
    table. The catalog reloads the table if it determines that the cached
    table is stale with respect to the ValidWriteIdList provided.
    In case the table is already at or above the requested ValidWriteIdList
    catalog uses the cached table metadata information to filter out
    filedescriptors pertaining to the provided ValidWriteIdList.
    Note that in case compactions it is possible that the requested
    ValidWriteIdList cannot be satisfied using the cached file-metadata
    for some partitions. For such partitions, catalog re-fetches the
    file-metadata from the FileSystem.
    
    In order to implement the fall-back to getting the file-metadata from
    filesystem, the patch refactor some of file-metadata loading logic into
    ParallelFileMetadataLoader which also helps simplify some methods
    in HdfsTable.java. Additionally, it modifies the WriteIdBasedPredicate
    to optionally do a strict check which throws an exception on some
    scenarios.
    
    This is helpful to provide a snapshot view of the table metadata during
    query compilation with respect to other changes happening to the table
    concurrently. Note that this change does not implement the coordinator
    side changes needed for catalog clients to use such a field. That would
    be taken up in a separate change to keep this patch smaller.
    
    Testing:
    1. Ran existing filemetadata loader tests.
    2. Added a new test which exercises the various cases for
    ValidWriteIdList comparison.
    3. Ran core tests along with the dependent MetastoreClientPool
    patch (IMPALA-9824).
    
    Change-Id: Ied2c7c3cb2009c407e8fbc3af4722b0d34f57c4a
    Reviewed-on: http://gerrit.cloudera.org:8080/16008
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogService.thrift                |   7 +
 .../impala/catalog/CatalogServiceCatalog.java      | 110 +++-
 .../apache/impala/catalog/FileMetadataLoader.java  |  15 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  | 122 +++--
 .../impala/catalog/ParallelFileMetadataLoader.java | 101 +++-
 .../main/java/org/apache/impala/catalog/Table.java |   2 +-
 .../org/apache/impala/catalog/TableLoadingMgr.java |   2 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   7 +-
 .../apache/impala/catalog/local/LocalFsTable.java  |   2 +-
 .../apache/impala/catalog/local/MetaProvider.java  |   3 +-
 .../apache/impala/service/CatalogOpExecutor.java   |   8 +-
 .../java/org/apache/impala/util/AcidUtils.java     | 224 +++++++-
 .../catalog/CatalogObjectToFromThriftTest.java     |  14 +-
 .../org/apache/impala/catalog/CatalogTest.java     | 107 ++--
 .../catalog/CatalogdTableInvalidatorTest.java      |   2 +-
 .../impala/catalog/FileMetadataLoaderTest.java     |  20 +-
 .../catalog/PartialCatalogInfoWriteIdTest.java     | 587 +++++++++++++++++++++
 .../events/MetastoreEventsProcessorTest.java       |   5 +-
 .../apache/impala/testutil/ImpalaJdbcClient.java   |   6 +
 .../apache/impala/testutil/ImpaladTestCatalog.java |   2 +-
 .../java/org/apache/impala/util/AcidUtilsTest.java |   3 +-
 shaded-deps/pom.xml                                |   1 +
 22 files changed, 1168 insertions(+), 182 deletions(-)

diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 0ab972d..8c42471 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -329,6 +329,10 @@ struct TTableInfoSelector {
   // The response should contain table constraints like primary keys
   // and foreign keys
   8: bool want_table_constraints
+
+  // If this is for a ACID table and this is set, this table info returned
+  // will be consistent the provided valid_write_ids
+  9: optional CatalogObjects.TValidWriteIdList valid_write_ids
 }
 
 // Returned information about a particular partition.
@@ -488,6 +492,9 @@ struct TGetCatalogObjectResponse {
 struct TGetPartitionStatsRequest {
   1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
   2: required CatalogObjects.TTableName table_name
+  // if the table is transactional then this field represents the client's view
+  // of the table snapshot view in terms of ValidWriteIdList.
+  3: optional CatalogObjects.TValidWriteIdList valid_write_ids
 }
 
 // Response for requesting partition statistics. All partition statistics
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 87465b6..994abff 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.catalog;
 
+import static org.apache.impala.thrift.TCatalogObjectType.TABLE;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,10 +36,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.collections.MapUtils;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -46,6 +51,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.authorization.AuthorizationDelta;
 import org.apache.impala.authorization.AuthorizationManager;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.ExternalEventsProcessor;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
@@ -59,6 +65,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -77,7 +84,9 @@ import org.apache.impala.thrift.TGetOperationUsageResponse;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.TGetPartitionStatsRequest;
+import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.TPartialCatalogInfo;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TPrincipalType;
@@ -88,6 +97,8 @@ import org.apache.impala.thrift.TTableUsage;
 import org.apache.impala.thrift.TTableUsageMetrics;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateTableUsageRequest;
+import org.apache.impala.thrift.TValidWriteIdList;
+import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.CatalogBlacklistUtils;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
@@ -105,7 +116,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Maps;
 
 /**
  * Specialized Catalog that implements the CatalogService specific Catalog
@@ -584,8 +596,14 @@ public class CatalogServiceCatalog extends Catalog {
     TTableName tableName = request.table_name;
     LOG.info("Fetching partition statistics for: " + tableName.getDb_name() + "."
         + tableName.getTable_name());
+    ValidWriteIdList writeIdList = null;
+    if (request.valid_write_ids != null) {
+      writeIdList = MetastoreShim.getValidWriteIdListFromThrift(
+        new TableName(request.table_name.db_name, request.table_name.table_name)
+          .toString(), request.valid_write_ids);
+    }
     Table table = getOrLoadTable(tableName.db_name, tableName.table_name,
-        "needed to fetch partition stats");
+        "needed to fetch partition stats", writeIdList);
 
     // Table could be null if it does not exist anymore.
     if (table == null) {
@@ -1203,7 +1221,7 @@ public class CatalogServiceCatalog extends Catalog {
   private void addTableToCatalogDeltaHelper(Table tbl, GetCatalogDeltaContext ctx)
       throws TException {
     TCatalogObject catalogTbl =
-        new TCatalogObject(TCatalogObjectType.TABLE, Catalog.INITIAL_CATALOG_VERSION);
+        new TCatalogObject(TABLE, Catalog.INITIAL_CATALOG_VERSION);
     tbl.getLock().lock();
     try {
       long tblVersion = tbl.getCatalogVersion();
@@ -1837,8 +1855,8 @@ public class CatalogServiceCatalog extends Catalog {
    * and the current cached value will be returned. This may mean that a missing table
    * (not yet loaded table) will be returned.
    */
-  public Table getOrLoadTable(String dbName, String tblName, String reason)
-      throws CatalogException {
+  public Table getOrLoadTable(String dbName, String tblName, String reason,
+      ValidWriteIdList validWriteIdList) throws CatalogException {
     TTableName tableName = new TTableName(dbName.toLowerCase(), tblName.toLowerCase());
     TableLoadingMgr.LoadRequest loadReq;
 
@@ -1847,7 +1865,17 @@ public class CatalogServiceCatalog extends Catalog {
     versionLock_.readLock().lock();
     try {
       Table tbl = getTable(dbName, tblName);
-      if (tbl == null || tbl.isLoaded()) return tbl;
+      // tbl doesn't exist in the catalog
+      if (tbl == null) return null;
+      // if no validWriteIdList is provided, we return the tbl if its loaded
+      if (tbl.isLoaded() && validWriteIdList == null) return tbl;
+      // if a validWriteIdList is provided, we see if the cached table can provided a
+      // consistent view of the given validWriteIdList. If yes, we can return the table
+      // otherwise we reload the table.
+      if (tbl instanceof HdfsTable
+          && AcidUtils.compare((HdfsTable) tbl, validWriteIdList) >= 0) {
+        return tbl;
+      }
       previousCatalogVersion = tbl.getCatalogVersion();
       loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
     } finally {
@@ -3001,10 +3029,19 @@ public class CatalogServiceCatalog extends Catalog {
     case TABLE:
     case VIEW: {
       Table table;
+      ValidWriteIdList writeIdList = null;
       try {
+        if (req.table_info_selector.valid_write_ids != null) {
+          Preconditions.checkState(objectDesc.type.equals(TABLE));
+          String dbName = objectDesc.getTable().db_name == null ? Catalog.DEFAULT_DB
+            : objectDesc.getTable().db_name;
+          String tblName = objectDesc.getTable().tbl_name;
+          writeIdList = MetastoreShim.getValidWriteIdListFromThrift(
+              dbName + "." + tblName, req.table_info_selector.valid_write_ids);
+        }
         table = getOrLoadTable(
             objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
-            "needed by coordinator");
+            "needed by coordinator", writeIdList);
       } catch (DatabaseNotFoundException e) {
         return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
       }
@@ -3015,10 +3052,23 @@ public class CatalogServiceCatalog extends Catalog {
         // invalidate request.
         return createGetPartialCatalogObjectError(CatalogLookupStatus.TABLE_NOT_LOADED);
       }
+      Map<HdfsPartition, TPartialPartitionInfo> missingPartialInfos;
+      TGetPartialCatalogObjectResponse resp;
       // TODO(todd): consider a read-write lock here.
       table.getLock().lock();
       try {
-        return table.getPartialInfo(req);
+        if (table instanceof HdfsTable) {
+          HdfsTable hdfsTable = (HdfsTable) table;
+          missingPartialInfos = Maps.newHashMap();
+          resp = hdfsTable.getPartialInfo(req, missingPartialInfos);
+          if (missingPartialInfos.isEmpty()) return resp;
+          // there were some partialPartitionInfos which don't have file-descriptors
+          // for the requested writeIdList
+          setFileMetadataFromFS(hdfsTable, writeIdList, missingPartialInfos);
+          return resp;
+        } else {
+          return table.getPartialInfo(req);
+        }
       } finally {
         table.getLock().unlock();
       }
@@ -3051,6 +3101,50 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  /**
+   * Helper method which gets the filemetadata for given HdfsPartitions from FileSystem
+   * with respect to the given ValidWriteIdList. Additionally, it sets it
+   * in the corresponding {@link TPartialPartitionInfo} object.
+   */
+  private void setFileMetadataFromFS(HdfsTable table, ValidWriteIdList reqWriteIdList,
+      Map<HdfsPartition, TPartialPartitionInfo> partToPartialInfoMap)
+      throws CatalogException {
+    Preconditions.checkNotNull(reqWriteIdList);
+    Preconditions.checkState(MapUtils.isNotEmpty(partToPartialInfoMap));
+    Stopwatch timer = Stopwatch.createStarted();
+    try {
+      String logPrefix = String.format(
+          "Fetching file and block metadata for %s paths for table %s for "
+              + "validWriteIdList %s", partToPartialInfoMap.size(), table.getFullName(),
+          reqWriteIdList);
+      ValidTxnList validTxnList;
+      try (MetaStoreClient client = getMetaStoreClient()) {
+        validTxnList = MetastoreShim.getValidTxns(client.getHiveClient());
+      } catch (TException ex) {
+        throw new CatalogException(
+            "Unable to fetch valid transaction ids while loading file metadata for table "
+                + table.getFullName(), ex);
+      }
+      Map<HdfsPartition, List<FileDescriptor>> fdsByPart = new ParallelFileMetadataLoader(
+          table, partToPartialInfoMap.keySet(), reqWriteIdList, validTxnList, logPrefix)
+          .loadAndGet();
+      for (HdfsPartition partition : fdsByPart.keySet()) {
+        TPartialPartitionInfo partitionInfo = partToPartialInfoMap.get(partition);
+        List<FileDescriptor> fds = fdsByPart.get(partition);
+        List<THdfsFileDesc> fileDescs = Lists.newArrayListWithCapacity(fds.size());
+        for (FileDescriptor fd : fds) {
+          fileDescs.add(fd.toThrift());
+        }
+        partitionInfo.setFile_descriptors(fileDescs);
+      }
+    } finally {
+      LOG.info(
+          "Time taken to load file metadata for table {} from filesystem for writeIdList"
+              + " {}: {} msec.", table.getFullName(), reqWriteIdList,
+          timer.stop().elapsed(TimeUnit.MILLISECONDS));
+    }
+  }
+
   private static TGetPartialCatalogObjectResponse createGetPartialCatalogObjectError(
       CatalogLookupStatus status) {
     TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 632bb78..d847a78 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -83,7 +83,7 @@ public class FileMetadataLoader {
    *   compactions.
    * @param writeIds if non-null, a write-id list which will filter the returned
    *   file descriptors to only include those indicated to be valid.
-   * @param HdfsFileFormat if non-null and equal to HdfsFileFormat.HUDI_PARQUET,
+   * @param fileFormat if non-null and equal to HdfsFileFormat.HUDI_PARQUET,
    *   this loader will filter files based on Hudi's HoodieROTablePathFilter method
    */
   public FileMetadataLoader(Path partDir, boolean recursive, List<FileDescriptor> oldFds,
@@ -148,11 +148,12 @@ public class FileMetadataLoader {
    * descriptors.
    *
    * @throws IOException if listing fails.
-   * @throws MetaException on ACID errors. TODO: remove this once IMPALA-9042 is resolved.
+   * @throws CatalogException on ACID errors. TODO: remove this once IMPALA-9042 is
+   * resolved.
    */
-  public void load() throws MetaException, IOException {
+  public void load() throws CatalogException, IOException {
     Preconditions.checkState(loadStats_ == null, "already loaded");
-    loadStats_ = new LoadStats();
+    loadStats_ = new LoadStats(partDir_);
     FileSystem fs = partDir_.getFileSystem(CONF);
 
     // If we don't have any prior FDs from which we could re-use old block location info,
@@ -259,7 +260,11 @@ public class FileMetadataLoader {
   }
 
   // File/Block metadata loading stats for a single HDFS path.
-  public class LoadStats {
+  public static class LoadStats {
+    private final Path partDir_;
+    LoadStats(Path partDir) {
+      this.partDir_ = Preconditions.checkNotNull(partDir);
+    }
     /** Number of files skipped because they pertain to an uncommitted ACID transaction */
     public int uncommittedAcidFilesSkipped = 0;
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index ce971b3..41f2a60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -56,6 +56,7 @@ import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
@@ -98,6 +99,7 @@ import org.slf4j.LoggerFactory;
 import com.codahale.metrics.Clock;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Timer;
+import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -106,7 +108,6 @@ import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 
@@ -160,6 +161,10 @@ public class HdfsTable extends Table implements FeFsTable {
   public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes";
   public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes";
   public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats";
+  // metrics used to find out the cache hit rate when file-metadata is requested
+  // for a given ValidWriteIdList
+  public static final String FILEMETADATA_CACHE_MISS_METRIC = "filemetadata-cache-miss";
+  public static final String FILEMETADATA_CACHE_HIT_METRIC = "filemetadata-cache-hit";
 
   // Load all partitions time, including fetching all partitions
   // from HMS and loading all partitions. The code path is
@@ -617,17 +622,9 @@ public class HdfsTable extends Table implements FeFsTable {
    * affects logging.
    */
   private void loadFileMetadataForPartitions(IMetaStoreClient client,
-      Iterable<HdfsPartition> parts, boolean isRefresh) throws CatalogException {
+      Collection<HdfsPartition> parts, boolean isRefresh) throws CatalogException {
     final Clock clock = Clock.defaultClock();
     long startTime = clock.getTick();
-    // Group the partitions by their path (multiple partitions may point to the same
-    // path).
-    Map<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    for (HdfsPartition p : parts) {
-      Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation()));
-      partsByPath.computeIfAbsent(partPath, (path) -> new ArrayList<HdfsPartition>())
-          .add(p);
-    }
 
     //TODO: maybe it'd be better to load the valid txn list in the context of a
     // transaction to have consistent valid write ids and valid transaction ids.
@@ -636,51 +633,18 @@ public class HdfsTable extends Table implements FeFsTable {
     // Impala doesn't notice when HMS's cleaner removes old transactional directories,
     // which might lead to FileNotFound exceptions.
     ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) : null;
-
-    // Create a FileMetadataLoader for each path.
-    Map<Path, FileMetadataLoader> loadersByPath = Maps.newHashMap();
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
-      List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
-      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
-          Utils.shouldRecursivelyListPartitions(this), oldFds, hostIndex_, validTxnList,
-          validWriteIds_, e.getValue().get(0).getFileFormat());
-      // If there is a cached partition mapped to this path, we recompute the block
-      // locations even if the underlying files have not changed.
-      // This is done to keep the cached block metadata up to date.
-      boolean hasCachedPartition = Iterables.any(e.getValue(),
-          HdfsPartition::isMarkedCached);
-      loader.setForceRefreshBlockLocations(hasCachedPartition);
-      loadersByPath.put(e.getKey(), loader);
-    }
-
     String logPrefix = String.format(
         "%s file and block metadata for %s paths for table %s",
-        isRefresh ? "Refreshing" : "Loading", partsByPath.size(),
+        isRefresh ? "Refreshing" : "Loading", parts.size(),
         getFullName());
-    FileSystem tableFs;
-    try {
-      tableFs = (new Path(getLocation())).getFileSystem(CONF);
-    } catch (IOException e) {
-      throw new CatalogException("Invalid table path for table: " + getFullName(), e);
-    }
 
     // Actually load the partitions.
     // TODO(IMPALA-8406): if this fails to load files from one or more partitions, then
     // we'll throw an exception here and end up bailing out of whatever catalog operation
     // we're in the middle of. This could cause a partial metadata update -- eg we may
     // have refreshed the top-level table properties without refreshing the files.
-    new ParallelFileMetadataLoader(logPrefix, tableFs, loadersByPath.values())
-        .load();
-
-    // Store the loaded FDs into the partitions.
-    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath.entrySet()) {
-      Path p = e.getKey();
-      FileMetadataLoader loader = loadersByPath.get(p);
-
-      for (HdfsPartition part : e.getValue()) {
-        part.setFileDescriptors(loader.getLoadedFds());
-      }
-    }
+    new ParallelFileMetadataLoader(this, parts, validWriteIds_, validTxnList, logPrefix)
+        .loadAndSet();
 
     // TODO(todd): would be good to log a summary of the loading process:
     // - how many block locations did we reuse/load individually/load via batch
@@ -688,7 +652,7 @@ public class HdfsTable extends Table implements FeFsTable {
     // - etc...
     String partNames = Joiner.on(", ").join(
         Iterables.limit(Iterables.transform(parts, HdfsPartition::getPartitionName), 3));
-    if (partsByPath.size() > 3) {
+    if (parts.size() > 3) {
       partNames += String.format(", and %s others",
           Iterables.size(parts) - 3);
     }
@@ -698,6 +662,14 @@ public class HdfsTable extends Table implements FeFsTable {
         getFullName(), partNames, PrintUtils.printTimeNs(duration));
   }
 
+  public FileSystem getFileSystem() throws CatalogException {
+    try {
+      return (new Path(getLocation())).getFileSystem(CONF);
+    } catch (IOException e) {
+      throw new CatalogException("Invalid table path for table: " + getFullName(), e);
+    }
+  }
+
   /**
    * Gets the AccessLevel that is available for Impala for this table based on the
    * permissions Impala has on the given path. If the path does not exist, recurses up
@@ -1513,11 +1485,12 @@ public class HdfsTable extends Table implements FeFsTable {
     return table;
   }
 
-  @Override
   public TGetPartialCatalogObjectResponse getPartialInfo(
-      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+      TGetPartialCatalogObjectRequest req,
+      Map<HdfsPartition, TPartialPartitionInfo> missingPartitionInfos)
+      throws CatalogException {
+    Preconditions.checkNotNull(missingPartitionInfos);
     TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req);
-
     boolean wantPartitionInfo = req.table_info_selector.want_partition_files ||
         req.table_info_selector.want_partition_metadata ||
         req.table_info_selector.want_partition_names ||
@@ -1530,6 +1503,12 @@ public class HdfsTable extends Table implements FeFsTable {
       partIds = partitionMap_.keySet();
     }
 
+    ValidWriteIdList reqWriteIdList = req.table_info_selector.valid_write_ids == null ?
+        null : MetastoreShim.getValidWriteIdListFromThrift(getFullName(),
+        req.table_info_selector.valid_write_ids);
+    Counter misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC);
+    Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC);
+    int numFilesFiltered = 0;
     if (partIds != null) {
       resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size());
       for (long partId : partIds) {
@@ -1552,10 +1531,24 @@ public class HdfsTable extends Table implements FeFsTable {
         }
 
         if (req.table_info_selector.want_partition_files) {
-          List<FileDescriptor> fds = part.getFileDescriptors();
-          partInfo.file_descriptors = Lists.newArrayListWithCapacity(fds.size());
-          for (FileDescriptor fd: fds) {
-            partInfo.file_descriptors.add(fd.toThrift());
+          List<FileDescriptor> filteredFds = new ArrayList<>(part.getFileDescriptors());
+          try {
+            numFilesFiltered += AcidUtils
+                .filterFdsForAcidState(filteredFds, reqWriteIdList);
+            partInfo.file_descriptors = Lists
+                .newArrayListWithCapacity(filteredFds.size());
+            for (FileDescriptor fd: filteredFds) {
+              partInfo.file_descriptors.add(fd.toThrift());
+            }
+            hits.inc();
+          } catch (CatalogException ex) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Could not use cached file descriptors of partition {} of table"
+                      + " {} for writeIdList {}", part.getPartitionName(), getFullName(),
+                  reqWriteIdList, ex);
+            }
+            misses.inc();
+            missingPartitionInfos.put(part, partInfo);
           }
         }
 
@@ -1567,6 +1560,11 @@ public class HdfsTable extends Table implements FeFsTable {
       }
     }
 
+    if (reqWriteIdList != null) {
+      LOG.debug("{} files filtered out of table {} for {}. Hit rate : {}",
+          numFilesFiltered, getFullName(), reqWriteIdList, getFileMetadataCacheHitRate());
+    }
+
     if (req.table_info_selector.want_partition_files) {
       // TODO(todd) we are sending the whole host index even if we returned only
       // one file -- maybe not so efficient, but the alternative is to do a bunch
@@ -1583,6 +1581,12 @@ public class HdfsTable extends Table implements FeFsTable {
     return resp;
   }
 
+  private double getFileMetadataCacheHitRate() {
+    long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount();
+    long misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount();
+    return ((double) hits) / (double) (hits+misses);
+  }
+
   /**
    * Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL"
    * information, then then all partitions and THdfsFileDescs of each partition should be
@@ -2029,6 +2033,8 @@ public class HdfsTable extends Table implements FeFsTable {
       public Boolean getValue() { return hasIncrementalStats_; }
     });
     metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC);
+    metrics_.addCounter(FILEMETADATA_CACHE_HIT_METRIC);
+    metrics_.addCounter(FILEMETADATA_CACHE_MISS_METRIC);
   }
 
   /**
@@ -2082,18 +2088,22 @@ public class HdfsTable extends Table implements FeFsTable {
    * Set ValistWriteIdList with stored writeId
    * @param client the client to access HMS
    */
-  protected void loadValidWriteIdList(IMetaStoreClient client)
+  protected boolean loadValidWriteIdList(IMetaStoreClient client)
       throws TableLoadingException {
     Stopwatch sw = Stopwatch.createStarted();
     Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null);
+    boolean prevWriteIdChanged = false;
     if (MetastoreShim.getMajorVersion() > 2 &&
         AcidUtils.isTransactionalTable(msTable_.getParameters())) {
-      validWriteIds_ = fetchValidWriteIds(client);
+      ValidWriteIdList writeIdList = fetchValidWriteIds(client);
+      prevWriteIdChanged = writeIdList.toString().equals(validWriteIds_);
+      validWriteIds_ = writeIdList;
     } else {
       validWriteIds_ = null;
     }
     LOG.debug("Load Valid Write Id List Done. Time taken: " +
         PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS)));
+    return prevWriteIdChanged;
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index e2f3881..c41b180 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -16,18 +16,30 @@
 // under the License.
 package org.apache.impala.catalog;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.catalog.FeFsTable.Utils;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TValidWriteIdList;
 import org.apache.impala.util.ThreadNameAnnotator;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,24 +69,81 @@ public class ParallelFileMetadataLoader {
   private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
 
   private final String logPrefix_;
-  private List<FileMetadataLoader> loaders_;
+  private final Map<Path, FileMetadataLoader> loaders_;
+  private final Map<Path, List<HdfsPartition>> partsByPath_;
   private final FileSystem fs_;
 
+  public ParallelFileMetadataLoader(HdfsTable table, Collection<HdfsPartition> parts,
+      ValidWriteIdList writeIdList, ValidTxnList validTxnList, String logPrefix)
+      throws CatalogException {
+    if (writeIdList != null || validTxnList != null) {
+      // make sure that both either both writeIdList and validTxnList are set or both
+      // of them are not.
+      Preconditions.checkState(writeIdList != null && validTxnList != null);
+    }
+    // Group the partitions by their path (multiple partitions may point to the same
+    // path).
+    partsByPath_ = Maps.newHashMap();
+    for (HdfsPartition p : parts) {
+      Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(p.getLocation()));
+      partsByPath_.computeIfAbsent(partPath, (path) -> new ArrayList<>())
+          .add(p);
+    }
+    // Create a FileMetadataLoader for each path.
+    loaders_ = Maps.newHashMap();
+    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+      List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
+      FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
+          Utils.shouldRecursivelyListPartitions(table), oldFds, table.getHostIndex(),
+          validTxnList, writeIdList, e.getValue().get(0).getFileFormat());
+      // If there is a cached partition mapped to this path, we recompute the block
+      // locations even if the underlying files have not changed.
+      // This is done to keep the cached block metadata up to date.
+      boolean hasCachedPartition = Iterables.any(e.getValue(),
+          HdfsPartition::isMarkedCached);
+      loader.setForceRefreshBlockLocations(hasCachedPartition);
+      loaders_.put(e.getKey(), loader);
+    }
+    this.logPrefix_ = logPrefix;
+    this.fs_ = table.getFileSystem();
+  }
+
+  /**
+   * Loads the file metadata for the given list of Partitions in the constructor. If the
+   * load is successful also set the fileDescriptors in the HdfsPartitions.
+   * @throws TableLoadingException
+   */
+  void loadAndSet() throws TableLoadingException {
+    load();
+
+    // Store the loaded FDs into the partitions.
+    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+      Path p = e.getKey();
+      FileMetadataLoader loader = loaders_.get(p);
+
+      for (HdfsPartition part : e.getValue()) {
+        part.setFileDescriptors(loader.getLoadedFds());
+      }
+    }
+  }
+
   /**
-   * @param logPrefix informational prefix for log messages
-   * @param fs the filesystem to load from (used to determine appropriate parallelism)
-   * @param loaders the metadata loaders to execute in parallel.
+   * Loads the file-metadata from FileSystem for the given list of HdfsPartitions
+   * @return a Mapping of HdfsPartition and its List of FileDescriptors
+   * @throws TableLoadingException
    */
-  public ParallelFileMetadataLoader(String logPrefix, FileSystem fs,
-      Collection<FileMetadataLoader> loaders) {
-    logPrefix_ = logPrefix;
-    loaders_ = ImmutableList.copyOf(loaders);
-
-    // TODO(todd) in actuality, different partitions could be on different file systems.
-    // We probably should create one pool per filesystem type, and size each of those
-    // pools based on that particular filesystem, so if there's a mixed S3+HDFS table
-    // we do the right thing.
-    fs_ = fs;
+  Map<HdfsPartition, List<FileDescriptor>> loadAndGet() throws TableLoadingException {
+    load();
+    Map<HdfsPartition, List<FileDescriptor>> result = Maps.newHashMap();
+    for (Map.Entry<Path, List<HdfsPartition>> e : partsByPath_.entrySet()) {
+      Path p = e.getKey();
+      FileMetadataLoader loader = loaders_.get(p);
+
+      for (HdfsPartition part : e.getValue()) {
+        result.put(part, loader.getLoadedFds());
+      }
+    }
+    return result;
   }
 
   /**
@@ -82,14 +151,14 @@ public class ParallelFileMetadataLoader {
    * an exception. However, any successful loaders are guaranteed to complete
    * before any exception is thrown.
    */
-  void load() throws TableLoadingException {
+  private void load() throws TableLoadingException {
     if (loaders_.isEmpty()) return;
 
     int failedLoadTasks = 0;
     ExecutorService pool = createPool();
     try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix_)) {
       List<Future<Void>> futures = new ArrayList<>(loaders_.size());
-      for (FileMetadataLoader loader : loaders_) {
+      for (FileMetadataLoader loader : loaders_.values()) {
         futures.add(pool.submit(() -> { loader.load(); return null; }));
       }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index faaeeed..2071f75 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -535,7 +535,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * service GetPartialCatalogObject RPCs.
    */
   public TGetPartialCatalogObjectResponse getPartialInfo(
-      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
     Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName());
     TTableInfoSelector selector = Preconditions.checkNotNull(req.table_info_selector,
         "no table_info_selector");
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
index 0a90e56..c4cef8d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoadingMgr.java
@@ -305,7 +305,7 @@ public class TableLoadingMgr {
       // TODO: Instead of calling "getOrLoad" here we could call "loadAsync". We would
       // just need to add a mechanism for moving loaded tables into the Catalog.
       catalog_.getOrLoadTable(tblName.getDb_name(), tblName.getTable_name(),
-          "background load");
+          "background load", null);
     } catch (CatalogException e) {
       // Ignore.
     } finally {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index ef4397d..e52f399 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TableMeta;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FileMetadataLoader;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -191,7 +192,7 @@ class DirectMetaProvider implements MetaProvider {
   public Map<String, PartitionMetadata> loadPartitionsByRefs(
       TableMetaRef table, List<String> partitionColumnNames,
       ListMap<TNetworkAddress> hostIndex,
-      List<PartitionRef> partitionRefs) throws MetaException, TException {
+      List<PartitionRef> partitionRefs) throws CatalogException, TException {
     Preconditions.checkNotNull(table);
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     Preconditions.checkArgument(!partitionColumnNames.isEmpty());
@@ -260,7 +261,7 @@ class DirectMetaProvider implements MetaProvider {
    */
   private Map<String, PartitionMetadata> loadUnpartitionedPartition(
       TableMetaRefImpl table, List<PartitionRef> partitionRefs,
-      ListMap<TNetworkAddress> hostIndex) throws MetaException {
+      ListMap<TNetworkAddress> hostIndex) throws CatalogException {
     //TODO(IMPALA-9042): Remove "throws MetaException"
     Preconditions.checkArgument(partitionRefs.size() == 1,
         "Expected exactly one partition to load for unpartitioned table");
@@ -318,7 +319,7 @@ class DirectMetaProvider implements MetaProvider {
 
   private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
       String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex)
-        throws MetaException {
+        throws CatalogException {
     //TODO(IMPALA-9042): Remove "throws MetaException"
     Path partDir = new Path(msPartition.getSd().getLocation());
     // TODO(todd): The table property to disable recursive loading is not supported
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 036cce3..38d2fd1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -422,7 +422,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     try {
       partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByRefs(
           ref_, getClusteringColumnNames(), hostIndex_, refs);
-    } catch (TException e) {
+    } catch (CatalogException | TException e) {
       throw new LocalCatalogException(
           "Could not load partitions for table " + getFullName(), e);
     }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 6d081c9..610d3e0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.SqlConstraints;
@@ -109,7 +110,7 @@ public interface MetaProvider {
   Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
       List<String> partitionColumnNames, ListMap<TNetworkAddress> hostIndex,
       List<PartitionRef> partitionRefs)
-      throws MetaException, TException;
+      throws MetaException, TException, CatalogException;
 
   /**
    * Load statistics for the given columns from the given table.
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 7591633..b2d6014 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1787,8 +1787,10 @@ public class CatalogOpExecutor {
     // be loaded because the planning phase on the impalad side triggered the loading.
     // In the LocalCatalog configuration, however, this is often necessary.
     try {
+      // we pass null validWriteIdList here since we don't really care what version of
+      // table is loaded, eventually its going to be dropped below.
       catalog_.getOrLoadTable(params.getTable_name().db_name,
-          params.getTable_name().table_name, "Load for DROP TABLE/VIEW");
+          params.getTable_name().table_name, "Load for DROP TABLE/VIEW", null);
 
     } catch (CatalogException e) {
       // Ignore exceptions -- the above was just to trigger loading. Failure to load
@@ -4555,7 +4557,9 @@ public class CatalogOpExecutor {
    */
   private Table getExistingTable(String dbName, String tblName, String reason)
       throws CatalogException {
-    Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason);
+    // passing null validWriteIdList makes sure that we return the table if it is
+    // already loaded.
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName, reason, null);
     if (tbl == null) {
       throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
     }
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index b3d637a..789c515 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -18,20 +18,29 @@ package org.apache.impala.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
 import com.google.errorprone.annotations.Immutable;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TTransactionalType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -164,34 +173,86 @@ public class AcidUtils {
 
   /**
    * Predicate that checks if the file or directory is relevant for a given WriteId list.
+   * The class does not implement a Predicate interface since we want to support a strict
+   * mode which throw exception in certain cases.
    * <p>
    *  <b>Must be called only for ACID table.</b>
    *  Checks that the path conforms to ACID table dir structure, and includes only
    *  directories that correspond to valid committed transactions.
    * </p>
    */
-  private static class WriteListBasedPredicate implements Predicate<String> {
+  private static class WriteListBasedPredicate {
 
+    @Nullable
     private final ValidTxnList validTxnList;
     private final ValidWriteIdList writeIdList;
+    // when strict mode is turned on, it throws exceptions when a given base file
+    // is invalid or a compacted delta file has some open writeIds.
+    private final boolean doStrictCheck;
+
+    /**
+     * Creates a Predicate just based on WriteIdList. This is used to filter out
+     * already cached filedescriptors where it is guaranteed that files related to
+     * invalid transactions are not loaded.
+     * @param writeIdList
+     */
+    WriteListBasedPredicate(ValidWriteIdList writeIdList, boolean strictMode) {
+      this.validTxnList = null;
+      this.writeIdList = Preconditions.checkNotNull(writeIdList);
+      this.doStrictCheck = strictMode;
+    }
 
+    /**
+     * Creates a Predicate based on a ValidTxnList and ValidWriteIdList. Useful when we
+     * are filtering out the file listing directly from fileSystem which may included
+     * compacted directories.
+     *
+     * @param validTxnList
+     * @param writeIdList
+     */
     WriteListBasedPredicate(ValidTxnList validTxnList, ValidWriteIdList writeIdList) {
       this.validTxnList = Preconditions.checkNotNull(validTxnList);
       this.writeIdList = Preconditions.checkNotNull(writeIdList);
+      this.doStrictCheck = false;
     }
 
-    public boolean test(String dirPath) {
+    public boolean check(String dirPath) throws CatalogException {
       ParsedBase parsedBase = parseBase(dirPath);
       if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) {
-        return writeIdList.isValidBase(parsedBase.writeId) &&
+        boolean isValid = writeIdList.isValidBase(parsedBase.writeId) &&
                isTxnValid(parsedBase.visibilityTxnId);
+        if (doStrictCheck && !isValid) {
+          throw new CatalogException("Invalid base file found " + dirPath);
+        }
+        return isValid;
       } else {
         ParsedDelta pd = parseDelta(dirPath);
         if (pd != null) {
           if (!isTxnValid(pd.visibilityTxnId)) return false;
           ValidWriteIdList.RangeResponse rr =
               writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId);
-          return rr != ValidWriteIdList.RangeResponse.NONE;
+          if (rr == RangeResponse.ALL) return true;
+          if (rr == RangeResponse.NONE) return false;
+          // either this is compacted delta file whose visibility transaction id is
+          // valid or a delta file generated by Hive Streaming engine.
+          // We allow the delta files for streaming engine which have open writeIds since
+          // backend code handles such writeIds appropriately.
+          if (!pd.isCompactedDeltaFile()) return true;
+          // This is a compacted delta file and has some writeIds which are not valid.
+          // We allow only aborted and committed writeIds in compacted files (no open
+          // writeIds)
+          for (long writeId = pd.minWriteId; writeId<=pd.maxWriteId; writeId++) {
+            if (!writeIdList.isWriteIdValid(writeId) && !writeIdList
+                .isWriteIdAborted(writeId)) {
+              if (doStrictCheck) {
+                throw new CatalogException(
+                    "Open writeId " + writeId + " found in compacted delta file "
+                        + dirPath);
+              }
+              return false;
+            }
+          }
+          return true;
         }
       }
       // If it wasn't in a base or a delta directory, we should include it.
@@ -200,8 +261,23 @@ public class AcidUtils {
       return true;
     }
 
+    /**
+     * The ACID compactor process does not change the writeIds of the compacted files (eg.
+     * delta_0001 and delta_0002 will be compacted to delta_0001_0002_v0123 where v0123 is
+     * the visibility txn id for the compaction itself). While this compaction is in
+     * progress, we need to make sure that we ignore all such files. This is where
+     * the TxnList is useful. We use the validTxnList to determine if the compaction
+     * process is committed or not. If its not, we ignore the files which
+     * are being compacted.
+     *
+     * @param visibilityTxnId TransactionID derived from the directory name on the
+     *                        filesystem.
+     * @return true if the given visibilityTxnId is valid. In case either the
+     * visibilityTxnId is -1 or validTxnList is null, we return true. False otherwise.
+     */
     private boolean isTxnValid(long visibilityTxnId) {
-      return visibilityTxnId == -1 || validTxnList.isTxnValid(visibilityTxnId);
+      return validTxnList == null ||
+        visibilityTxnId == -1 || validTxnList.isTxnValid(visibilityTxnId);
     }
   }
 
@@ -252,6 +328,10 @@ public class AcidUtils {
       this.statementId = statementId;
       this.visibilityTxnId = visibilityTxnId;
     }
+
+    private boolean isCompactedDeltaFile() {
+      return visibilityTxnId != -1;
+    }
   }
 
   private static ParsedDelta matcherToParsedDelta(Matcher deltaMatcher) {
@@ -286,6 +366,39 @@ public class AcidUtils {
   }
 
   /**
+   * This method is similar to {@link AcidUtils#filterFilesForAcidState} with the
+   * difference that it expects input to be valid file descriptors from a loaded table.
+   * This means that file descriptors are already pre-vetted and are consistent with
+   * respect to some ValidWriteIdList and ValidTxnList. All this method does is to try to
+   * filter such file descriptors for a different ValidWriteIdList.
+   *
+   * @param fds Input list of File descriptors to be filtered in-place.
+   * @param validWriteIdList The ValidWriteIdList for which we filter the fds.
+   * @return The number of file descriptors which were filtered out.
+   * @throws CatalogException if any of the provided FileDescriptor could be included or
+   * excluded since it contains some writeIds which are invalid for the given
+   * ValidWriteIdList.
+   */
+  public static int filterFdsForAcidState(List<FileDescriptor> fds,
+      ValidWriteIdList validWriteIdList) throws CatalogException {
+    Preconditions.checkNotNull(fds);
+
+    if (validWriteIdList == null) return 0;
+
+    WriteListBasedPredicate writeListBasedPredicate = new WriteListBasedPredicate(
+        validWriteIdList, true);
+    Iterator<FileDescriptor> it = fds.iterator();
+    int numRemoved = 0;
+    while (it.hasNext()) {
+      if (!writeListBasedPredicate.check(it.next().getRelativePath())) {
+        it.remove();
+        numRemoved++;
+      }
+    }
+    return numRemoved;
+  }
+
+  /**
    * Filters the files based on Acid state.
    * @param stats the FileStatuses obtained from recursively listing the directory
    * @param baseDir the base directory for the partition (or table, in the case of
@@ -294,22 +407,22 @@ public class AcidUtils {
    * @param loadStats stats to add counts of skipped files to. May be null.
    * @return the FileStatuses that is a subset of passed in descriptors that
    *    must be used.
-   * @throws MetaException on ACID error. TODO: Remove throws clause once IMPALA-9042
+   * @throws CatalogException on ACID error. TODO: Remove throws clause once IMPALA-9042
    * is resolved.
    */
   public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
       Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds,
-      @Nullable LoadStats loadStats) throws MetaException {
+      @Nullable LoadStats loadStats) throws CatalogException {
     // First filter out any paths that are not considered valid write IDs.
     // At the same time, calculate the max valid base write ID and collect the names of
     // the delta directories.
-    Predicate<String> pred = new WriteListBasedPredicate(validTxnList, writeIds);
+    WriteListBasedPredicate pred = new WriteListBasedPredicate(validTxnList, writeIds);
     long maxBaseWriteId = Long.MIN_VALUE;
     Set<String> deltaDirNames = new HashSet<>();
     for (Iterator<FileStatus> it = stats.iterator(); it.hasNext();) {
       FileStatus stat = it.next();
       String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
-      if (!pred.test(relPath)) {
+      if (!pred.check(relPath)) {
         it.remove();
         if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++;
         continue;
@@ -323,7 +436,7 @@ public class AcidUtils {
     }
     // Get a list of all valid delta directories.
     List<Pair<String, ParsedDelta>> deltas =
-        getValidDeltaDirsOrdered(deltaDirNames, maxBaseWriteId, writeIds);
+        getValidDeltaDirsOrdered(deltaDirNames, maxBaseWriteId);
     // Filter out delta directories superceded by major/minor compactions.
     Set<String> filteredDeltaDirs =
         getFilteredDeltaDirs(deltas, maxBaseWriteId, writeIds);
@@ -335,7 +448,7 @@ public class AcidUtils {
 
   private static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
       Path baseDir, long maxBaseWriteId, Set<String> deltaDirs,
-      @Nullable LoadStats loadStats) throws MetaException {
+      @Nullable LoadStats loadStats) throws CatalogException {
     List<FileStatus> validStats = new ArrayList<>(stats);
     for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext();) {
       FileStatus stat = it.next();
@@ -354,7 +467,7 @@ public class AcidUtils {
           if (loadStats != null) loadStats.filesSupersededByAcidState++;
         }
         if (relPath.endsWith("_flush_length")) {
-          throw new MetaException("Found Hive Streaming side-file: " +
+          throw new CatalogException("Found Hive Streaming side-file: " +
               stat.getPath() + " It means that the contents of the directory are " +
               "currently being written, therefore Impala is not able to read it. " +
               "Please try to load the table again once Hive Streaming commits " +
@@ -384,8 +497,8 @@ public class AcidUtils {
   }
 
   private static List<Pair<String, ParsedDelta>> getValidDeltaDirsOrdered(
-      Set<String> deltaDirNames, long baseWriteId, ValidWriteIdList writeIds)
-      throws MetaException {
+      Set<String> deltaDirNames, long baseWriteId)
+      throws CatalogException {
     List <Pair<String, ParsedDelta>> deltas = new ArrayList<>();
     for (Iterator<String> it = deltaDirNames.iterator(); it.hasNext();) {
       String dirname = it.next();
@@ -402,7 +515,7 @@ public class AcidUtils {
       ParsedDelta deleteDelta = parseDeleteDelta(dirname);
       if (deleteDelta != null) {
         if (deleteDelta.maxWriteId > baseWriteId) {
-          throw new MetaException("Table has deleted rows. It's currently not "
+          throw new CatalogException("Table has deleted rows. It's currently not "
               + "supported by Impala. Run major compaction to resolve this.");
         }
       }
@@ -519,4 +632,85 @@ public class AcidUtils {
     }
     return filteredDeltaDirs;
   }
+
+  /**
+   * This method compares the writeIdList of the given table if it is loaded and is a
+   * transactional table with the given ValidWriteIdList. If the tbl metadata is a
+   * superset of the metadata view represented by the given validWriteIdList this
+   * method returns a value greater than 0. If they are an exact match of each other,
+   * it returns 0 and if the table ValidWriteIdList is behind the provided
+   * validWriteIdList this return -1. This information useful to determine if the
+   * cached table can be used to construct a consistent snapshot corresponding to the
+   * given validWriteIdList.
+   */
+  public static int compare(HdfsTable tbl, ValidWriteIdList validWriteIdList) {
+    Preconditions.checkState(tbl != null && tbl.getMetaStoreTable() != null);
+    // if tbl is not a transactional, there is nothing to compare against and we return 0
+    if (!isTransactionalTable(tbl.getMetaStoreTable().getParameters())) return 0;
+    Preconditions.checkNotNull(tbl.getValidWriteIds());
+    return compare(tbl.getValidWriteIds(), validWriteIdList);
+  }
+
+  /*** This method is mostly copied from {@link org.apache.hive.common.util.TxnIdUtils}
+   * (e649562) with the exception that the table names for both the input writeIdList
+   * must be the same to have a valid comparison.
+   * //TODO source this directly from hive-exec so that future changes to this are
+   * automatically imported.
+   *
+   * @param a
+   * @param b
+   * @return 0, if a and b are equivalent
+   * 1, if a is more recent
+   * -1, if b is more recent
+   ***/
+  private static int compare(ValidWriteIdList a, ValidWriteIdList b) {
+    Preconditions.checkState(a.getTableName().equalsIgnoreCase(b.getTableName()));
+    // The algorithm assumes invalidWriteIds are sorted and values are less or equal than
+    // hwm, here is how the algorithm works:
+    // 1. Compare two invalidWriteIds until one the list ends, difference means the
+    // mismatch writeid is committed in one ValidWriteIdList but not the other, the
+    // comparison end
+    // 2. Every writeid from the last writeid in the short invalidWriteIds till its
+    // hwm should be committed in the other ValidWriteIdList, otherwise the comparison
+    // end
+    // 3. Every writeid from lower hwm to higher hwm should be invalid, otherwise, the
+    // comparison end
+    int minLen = Math.min(a.getInvalidWriteIds().length, b.getInvalidWriteIds().length);
+    for (int i = 0; i < minLen; i++) {
+      if (a.getInvalidWriteIds()[i] == b.getInvalidWriteIds()[i]) {
+        continue;
+      }
+      return a.getInvalidWriteIds()[i] > b.getInvalidWriteIds()[i] ? 1 : -1;
+    }
+    if (a.getInvalidWriteIds().length == b.getInvalidWriteIds().length) {
+      return Long.signum(a.getHighWatermark() - b.getHighWatermark());
+    }
+    if (a.getInvalidWriteIds().length == minLen) {
+      if (a.getHighWatermark() != b.getInvalidWriteIds()[minLen] - 1) {
+        return Long.signum(a.getHighWatermark() - (b.getInvalidWriteIds()[minLen] - 1));
+      }
+      if (allInvalidFrom(b.getInvalidWriteIds(), minLen, b.getHighWatermark())) {
+        return 0;
+      } else {
+        return -1;
+      }
+    } else {
+      if (b.getHighWatermark() != a.getInvalidWriteIds()[minLen] - 1) {
+        return Long.signum(b.getHighWatermark() - (a.getInvalidWriteIds()[minLen] - 1));
+      }
+      if (allInvalidFrom(a.getInvalidWriteIds(), minLen, a.getHighWatermark())) {
+        return 0;
+      } else {
+        return 1;
+      }
+    }
+  }
+  private static boolean allInvalidFrom(long[] invalidIds, int start, long hwm) {
+    for (int i=start+1;i<invalidIds.length;i++) {
+      if (invalidIds[i] != (invalidIds[i-1]+1)) {
+        return false;
+      }
+    }
+    return invalidIds[invalidIds.length-1] == hwm;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 2da922e..838983d 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -63,7 +63,7 @@ public class CatalogObjectToFromThriftTest {
     String[] dbNames = {"functional", "functional_avro", "functional_parquet",
                         "functional_seq"};
     for (String dbName: dbNames) {
-      Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
+      Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test", null);
       Assert.assertEquals(24, ((HdfsTable)table).getPartitions().size());
       Assert.assertEquals(24, ((HdfsTable)table).getPartitionIds().size());
 
@@ -131,7 +131,7 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestMismatchedAvroAndTableSchemas() throws CatalogException {
     Table table = catalog_.getOrLoadTable("functional_avro_snap",
-        "schema_resolution_test", "test");
+        "schema_resolution_test", "test", null);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "schema_resolution_test");
     Assert.assertTrue(thriftTable.isSetTable_type());
@@ -151,7 +151,7 @@ public class CatalogObjectToFromThriftTest {
   @Test
   public void TestHBaseTables() throws CatalogException {
     String dbName = "functional_hbase";
-    Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test");
+    Table table = catalog_.getOrLoadTable(dbName, "alltypes", "test", null);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypes");
     Assert.assertEquals(thriftTable.db_name, dbName);
@@ -180,7 +180,7 @@ public class CatalogObjectToFromThriftTest {
   public void TestHBaseTableWithBinaryEncodedCols()
       throws CatalogException {
     String dbName = "functional_hbase";
-    Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary", "test");
+    Table table = catalog_.getOrLoadTable(dbName, "alltypessmallbinary", "test", null);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "alltypessmallbinary");
     Assert.assertEquals(thriftTable.db_name, dbName);
@@ -217,7 +217,7 @@ public class CatalogObjectToFromThriftTest {
     Assume.assumeTrue(
         "Skipping this test since it is only supported when running against Hive-2",
         TestUtils.getHiveMajorVersion() == 2);
-    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
+    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test", null);
     Assert.assertNotNull(table);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "hive_index_tbl");
@@ -226,7 +226,7 @@ public class CatalogObjectToFromThriftTest {
 
   @Test
   public void TestTableLoadingErrors() throws ImpalaException {
-    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test", null);
     HdfsTable hdfsTable = (HdfsTable) table;
     // Get any partition with valid HMS parameters to create a
     // dummy partition.
@@ -253,7 +253,7 @@ public class CatalogObjectToFromThriftTest {
 
   @Test
   public void TestView() throws CatalogException {
-    Table table = catalog_.getOrLoadTable("functional", "view_view", "test");
+    Table table = catalog_.getOrLoadTable("functional", "view_view", "test", null);
     TTable thriftTable = getThriftTable(table);
     Assert.assertEquals(thriftTable.tbl_name, "view_view");
     Assert.assertEquals(thriftTable.db_name, "functional");
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 37f4664..a73e9a1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -132,37 +132,41 @@ public class CatalogTest {
     Db functionalDb = catalog_.getDb("functional");
     assertNotNull(functionalDb);
     assertEquals(functionalDb.getName(), "functional");
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "complex_view", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "testtbl", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "jointbl", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "liketbl", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "greptiny", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "view_view", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypes_view_sub", "test",
+      null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypessmall", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserror", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypeserrornonulls", "test",
+     null ));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesagg", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesaggnonulls", "test",
+      null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesnopart", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "alltypesinsert", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "complex_view", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "testtbl", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "dimtbl", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "jointbl", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "liketbl", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "greptiny", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "rankingssmall", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "uservisitssmall", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "view_view", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional", "date_tbl", "test", null));
     // IMP-163 - table with string partition column does not load if there are partitions
-    assertNotNull(catalog_.getOrLoadTable("functional", "StringPartitionKey", "test"));
+    assertNotNull(
+        catalog_.getOrLoadTable("functional", "StringPartitionKey", "test", null));
     // Test non-existent table
-    assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable", "test"));
+    assertNull(catalog_.getOrLoadTable("functional", "nonexistenttable", "test", null));
 
     // functional_seq contains the same tables as functional
     Db testDb = catalog_.getDb("functional_seq");
     assertNotNull(testDb);
     assertEquals(testDb.getName(), "functional_seq");
-    assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes", "test"));
-    assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl", "test"));
+    assertNotNull(catalog_.getOrLoadTable("functional_seq", "alltypes", "test", null));
+    assertNotNull(catalog_.getOrLoadTable("functional_seq", "testtbl", "test", null));
 
     Db hbaseDb = catalog_.getDb("functional_hbase");
     assertNotNull(hbaseDb);
@@ -170,14 +174,16 @@ public class CatalogTest {
     // Loading succeeds for an HBase table that has binary columns and an implicit key
     // column mapping
     assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmallbinary",
-        "test"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall", "test"));
+        "test", null));
+    assertNotNull(
+        catalog_.getOrLoadTable(hbaseDb.getName(), "alltypessmall", "test", null));
     assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "hbasealltypeserror",
-        "test"));
+        "test", null));
     assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(),
-        "hbasealltypeserrornonulls", "test"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg", "test"));
-    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids", "test"));
+        "hbasealltypeserrornonulls", "test", null));
+    assertNotNull(
+        catalog_.getOrLoadTable(hbaseDb.getName(), "alltypesagg", "test", null));
+    assertNotNull(catalog_.getOrLoadTable(hbaseDb.getName(), "stringids", "test", null));
 
     checkTableCols(functionalDb, "alltypes", 2,
         new String[]
@@ -327,8 +333,8 @@ public class CatalogTest {
         new Type[] {Type.DATE, Type.INT, Type.DATE});
 
     // case-insensitive lookup
-    assertEquals(catalog_.getOrLoadTable("functional", "alltypes", "test"),
-        catalog_.getOrLoadTable("functional", "AllTypes", "test"));
+    assertEquals(catalog_.getOrLoadTable("functional", "alltypes", "test", null),
+        catalog_.getOrLoadTable("functional", "AllTypes", "test", null));
   }
 
   // Count of listFiles (list status + blocks) calls
@@ -357,7 +363,7 @@ public class CatalogTest {
         /*dbWasAdded=*/new Reference<Boolean>());
 
     HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", "AllTypes",
-        "test");
+        "test", null);
     StorageStatistics opsCounts = stats.get(DFSOpsCountStatistics.NAME);
 
     // We expect:
@@ -405,7 +411,7 @@ public class CatalogTest {
     // an RPC per file.
     stats.reset();
     HdfsTable unpartTable = (HdfsTable)catalog_.getOrLoadTable(
-        "functional", "alltypesaggmultifilesnopart", "test");
+        "functional", "alltypesaggmultifilesnopart", "test", null);
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
     stats.reset();
     catalog_.reloadTable(unpartTable, "test");
@@ -432,7 +438,7 @@ public class CatalogTest {
   @Test
   public void TestPartitions() throws CatalogException {
     HdfsTable table =
-        (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes", "test");
+        (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypes", "test", null);
     checkAllTypesPartitioning(table, true);
   }
 
@@ -442,7 +448,7 @@ public class CatalogTest {
   @Test
   public void testGetSqlConstraints() throws Exception {
     FeFsTable t = (FeFsTable) catalog_.getOrLoadTable("functional", "parent_table",
-        "test");
+        "test", null);
     assertNotNull(t);
     assertTrue(t instanceof FeFsTable);
     List<SQLPrimaryKey> primaryKeys = t.getSqlConstraints().getPrimaryKeys();
@@ -457,9 +463,9 @@ public class CatalogTest {
     assertEquals("year", primaryKeys.get(1).getColumn_name());
 
     // Force load parent_table_2. Required for fetching foreign keys from child_table.
-    catalog_.getOrLoadTable("functional", "parent_table_2", "test");
+    catalog_.getOrLoadTable("functional", "parent_table_2", "test", null);
 
-    t = (FeFsTable) catalog_.getOrLoadTable("functional", "child_table", "test");
+    t = (FeFsTable) catalog_.getOrLoadTable("functional", "child_table", "test", null);
     assertNotNull(t);
     assertTrue(t instanceof FeFsTable);
     primaryKeys = t.getSqlConstraints().getPrimaryKeys();
@@ -484,7 +490,7 @@ public class CatalogTest {
 
     // Check tables without constraints.
     t = (FeFsTable) catalog_.getOrLoadTable("functional", "alltypes",
-        "test");
+        "test", null);
     assertNotNull(t);
     assertTrue(t instanceof FeFsTable);
     primaryKeys = t.getSqlConstraints().getPrimaryKeys();
@@ -542,7 +548,7 @@ public class CatalogTest {
   public void testStats() throws CatalogException {
     // make sure the stats for functional.alltypesagg look correct
     HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "AllTypesAgg",
-        "test");
+        "test", null);
 
     Column idCol = table.getColumn("id");
     assertEquals(idCol.getStats().getAvgSerializedSize(),
@@ -616,7 +622,7 @@ public class CatalogTest {
     // First load a table that has column stats.
     //catalog_.refreshTable("functional", "alltypesagg", false);
     HdfsTable table = (HdfsTable) catalog_.getOrLoadTable("functional", "alltypesagg",
-        "test");
+        "test", null);
 
     // Now attempt to update a column's stats with mismatched stats data and ensure
     // we get the expected results.
@@ -703,7 +709,7 @@ public class CatalogTest {
   @Test
   public void testPullIncrementalStats() throws CatalogException {
     // Partitioned table with stats. Load the table prior to fetching.
-    catalog_.getOrLoadTable("functional", "alltypesagg", "test");
+    catalog_.getOrLoadTable("functional", "alltypesagg", "test", null);
     expectStatistics("functional", "alltypesagg", 11);
 
     // Partitioned table with stats. Invalidate the table prior to fetching.
@@ -732,7 +738,7 @@ public class CatalogTest {
   public void testInternalHBaseTable() throws CatalogException {
     // Cast will fail if table not an HBaseTable
    HBaseTable table = (HBaseTable)
-       catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table", "test");
+       catalog_.getOrLoadTable("functional_hbase", "internal_hbase_table", "test", null);
     assertNotNull("functional_hbase.internal_hbase_table was not found", table);
   }
 
@@ -744,7 +750,7 @@ public class CatalogTest {
 
   @Test
   public void testCreateTableMetadata() throws CatalogException {
-    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypes", "test", null);
     // Tables are created via Impala so the metadata should have been populated properly.
     // alltypes is an external table.
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
@@ -762,13 +768,13 @@ public class CatalogTest {
   public void testCreateTableMetadataHive3() throws CatalogException {
     Assume.assumeTrue(TestUtils.getHiveMajorVersion() > 2);
     // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
-    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test", null);
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.EXTERNAL_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
     // ACID tables should be loaded as MANAGED tables
     table = catalog_.getOrLoadTable("functional", "insert_only_transactional_table",
-        "test");
+        "test", null);
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.MANAGED_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
@@ -784,7 +790,7 @@ public class CatalogTest {
   public void testCreateTableMetadataHive2() throws CatalogException {
     Assume.assumeTrue(TestUtils.getHiveMajorVersion() <= 2);
     // alltypesinsert is created using CREATE TABLE LIKE and is a MANAGED table
-    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test");
+    Table table = catalog_.getOrLoadTable("functional", "alltypesinsert", "test", null);
     assertEquals(System.getProperty("user.name"), table.getMetaStoreTable().getOwner());
     assertEquals(TableType.MANAGED_TABLE.toString(),
         table.getMetaStoreTable().getTableType());
@@ -797,7 +803,7 @@ public class CatalogTest {
     Assume.assumeTrue(
         "Skipping this test since it is only supported when running against Hive-2",
         TestUtils.getHiveMajorVersion() == 2);
-    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test");
+    Table table = catalog_.getOrLoadTable("functional", "hive_index_tbl", "test", null);
     assertTrue(table instanceof IncompleteTable);
     IncompleteTable incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -808,7 +814,7 @@ public class CatalogTest {
   @Test
   public void testLoadingUnsupportedTableTypes() throws CatalogException {
     // Table with unsupported SerDe library.
-    Table table = catalog_.getOrLoadTable("functional", "bad_serde", "test");
+    Table table = catalog_.getOrLoadTable("functional", "bad_serde", "test", null);
     assertTrue(table instanceof IncompleteTable);
     IncompleteTable incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
@@ -818,7 +824,8 @@ public class CatalogTest {
 
     // Impala does not yet support Hive's LazyBinaryColumnarSerDe which can be
     // used for RCFILE tables.
-    table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde", "test");
+    table = catalog_.getOrLoadTable("functional_rc", "rcfile_lazy_binary_serde", "test"
+        , null);
     assertTrue(table instanceof IncompleteTable);
     incompleteTable = (IncompleteTable) table;
     assertTrue(incompleteTable.getCause() instanceof TableLoadingException);
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
index e20566e..e6fd055 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogdTableInvalidatorTest.java
@@ -64,7 +64,7 @@ public class CatalogdTableInvalidatorTest {
             2, /*invalidateTablesOnMemoryPressure=*/false, /*oldGenFullThreshold=*/
             0.6, /*gcInvalidationFraction=*/0.1));
     Assert.assertFalse(catalog_.getDb(dbName).getTable(tblName).isLoaded());
-    Table table = catalog_.getOrLoadTable(dbName, tblName, "test");
+    Table table = catalog_.getOrLoadTable(dbName, tblName, "test", null);
     Assert.assertTrue(table.isLoaded());
     Assert.assertEquals(ticker.now_, table.getLastUsedTime());
     long previousTriggerCount = catalog_.getCatalogdTableInvalidator().scanCount_.get();
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index b6fe650..8498a7b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -42,8 +42,8 @@ import com.google.common.collect.ImmutableList;
 public class FileMetadataLoaderTest {
 
   @Test
-  public void testRecursiveLoading() throws IOException, MetaException {
-    //TODO(IMPALA-9042): Remove "throws MetaException"
+  public void testRecursiveLoading() throws IOException, CatalogException {
+    //TODO(IMPALA-9042): Remove "throws CatalogException"
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
@@ -80,8 +80,8 @@ public class FileMetadataLoaderTest {
   }
 
   @Test
-  public void testHudiParquetLoading() throws IOException, MetaException {
-    //TODO(IMPALA-9042): Remove "throws MetaException"
+  public void testHudiParquetLoading() throws IOException, CatalogException {
+    //TODO(IMPALA-9042): Remove "throws CatalogException"
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/hudi_parquet/");
     FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
@@ -112,8 +112,8 @@ public class FileMetadataLoaderTest {
   }
 
   @Test
-  public void testAcidMinorCompactionLoading() throws IOException, MetaException {
-    //TODO(IMPALA-9042): Remove "throws MetaException"
+  public void testAcidMinorCompactionLoading() throws IOException, CatalogException {
+    //TODO(IMPALA-9042): Remove "throws CatalogException"
     ListMap<TNetworkAddress> hostIndex = new ListMap<>();
     ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
         "functional_orc_def.complextypestbl_minor_compacted:10:10::");
@@ -130,8 +130,8 @@ public class FileMetadataLoaderTest {
   }
 
   @Test
-  public void testLoadMissingDirectory() throws IOException, MetaException {
-    //TODO(IMPALA-9042): Remove "throws MetaException"
+  public void testLoadMissingDirectory() throws IOException, CatalogException {
+    //TODO(IMPALA-9042): Remove "throws CatalogException"
     for (boolean recursive : ImmutableList.of(false, true)) {
       ListMap<TNetworkAddress> hostIndex = new ListMap<>();
       Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/does-not-exist/");
@@ -142,9 +142,9 @@ public class FileMetadataLoaderTest {
     }
   }
 
-  //TODO(IMPALA-9042): Remove 'throws MetaException'
+  //TODO(IMPALA-9042): Remove 'throws CatalogException'
   @Test
-  public void testSkipHiddenDirectories() throws IOException, MetaException {
+  public void testSkipHiddenDirectories() throws IOException, CatalogException {
     Path sourcePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
     Path tmpTestPath = new Path("hdfs://localhost:20500/tmp/test-filemetadata-loader");
     Configuration conf = new Configuration();
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java
new file mode 100644
index 0000000..fdb8855
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoWriteIdTest.java
@@ -0,0 +1,587 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.Reference;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.HiveJdbcClientPool;
+import org.apache.impala.testutil.HiveJdbcClientPool.HiveJdbcClient;
+import org.apache.impala.testutil.ImpalaJdbcClient;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsTable;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+import org.apache.impala.thrift.TTableName;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterables;
+
+/**
+ * Tests related to getPartialCatalogObject API support for ValidWriteIdList. The tests
+ * execute various scenarios when the catalog cache state is either behind or ahead of the
+ * client provided ValidWriteIdList. TODO: The test is more appropriate for a e2e test in
+ * pytest framework since sometimes the compactions take long time.
+ * But that would need frontend support for sending the ValidWriteIdList
+ * which is more complex and needs to be done as a separate change (IMPALA-8788).
+ */
+public class PartialCatalogInfoWriteIdTest {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(PartialCatalogInfoWriteIdTest.class);
+  private static CatalogServiceCatalog catalog_;
+  private static HiveJdbcClientPool hiveClientPool_;
+  private static final String testDbName = "partial_catalog_info_test";
+  private static final String testTblName = "insert_only";
+  private static final String testPartitionedTbl = "insert_only_partitioned";
+
+  @BeforeClass
+  public static void setupTestEnv() throws SQLException, ClassNotFoundException {
+    catalog_ = CatalogServiceTestCatalog.create();
+    hiveClientPool_ = HiveJdbcClientPool.create(1);
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    if (catalog_ != null) {
+      catalog_.close();
+    }
+    if (hiveClientPool_ != null) {
+      hiveClientPool_.close();
+    }
+  }
+
+  @Before
+  public void createTestTbls() throws Exception {
+    Stopwatch st = Stopwatch.createStarted();
+    ImpalaJdbcClient client = ImpalaJdbcClient
+        .createClientUsingHiveJdbcDriver();
+    client.connect();
+    try {
+      client.execStatement("drop database if exists " + testDbName + " cascade");
+      client.execStatement("create database " + testDbName);
+      client.execStatement("create table " + getTestTblName() + " like "
+          + "functional.insert_only_transactional_table stored as parquet");
+      client.execStatement("insert into " + getTestTblName() + " values (1)");
+      client.execStatement("create table " + getPartitionedTblName() + " (c1 int) "
+              + "partitioned by (part int) stored as parquet " + getTblProperties());
+      client.execStatement("insert into " + getPartitionedTblName() +
+          " partition (part=1) values (1)");
+    } finally {
+      LOG.info("Time taken for createTestTbls {} msec",
+          st.stop().elapsed(TimeUnit.MILLISECONDS));
+      client.close();
+    }
+    catalog_.reset();
+  }
+
+  private static String getTblProperties() {
+    return "tblproperties ('transactional'='true', 'transactional_properties' = "
+      + "'insert_only')";
+  }
+
+  @After
+  public void dropTestTbls() throws Exception {
+    ImpalaJdbcClient client = ImpalaJdbcClient
+        .createClientUsingHiveJdbcDriver();
+    client.connect();
+    try {
+      client.execStatement("drop database if exists " + testDbName + " cascade");
+    } finally {
+      client.close();
+    }
+  }
+
+  /**
+   * Catalog does not have have the table loaded. This is the base case when the table is
+   * first loaded. It makes sure that the returned writeIdList is consistent with the
+   * client's writeListIdList.
+   */
+  @Test
+  public void testCatalogLoadWithWriteIds()
+    throws CatalogException, InternalException, TException {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    // invalidate the ACID tables if it already exists
+    invalidateTbl(testDbName, testTblName);
+    long prevVersion =
+      catalog_.getOrLoadTable(testDbName, testTblName, "test", null).getCatalogVersion();
+    ValidWriteIdList validWriteIdList = getValidWriteIdList(testDbName, testTblName);
+    TGetPartialCatalogObjectRequest req = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testTblName)
+      .writeId(validWriteIdList)
+      .wantFiles()
+      .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(req);
+    Assert.assertEquals(MetastoreShim.convertToTValidWriteIdList(validWriteIdList),
+      response.table_info.valid_write_ids);
+    // make sure the table was not loaded in the cache hit scenario
+    Assert.assertTrue(
+      catalog_.getTable(testDbName, testTblName).getCatalogVersion() == prevVersion);
+  }
+
+  /**
+   * Test exercises the code path when catalog has a stale transactional table in its
+   * cache when compared the to the client provided ValidWriteIdList. It makes sure that
+   * the table is reloaded and the returned writeId is consistent with the requested
+   * writeIdList of the table.
+   */
+  @Test
+  public void testCatalogBehindClientWriteIds() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    Table tbl = catalog_.getOrLoadTable(testDbName, testTblName, "test", null);
+    Assert.assertFalse("Table must be loaded",
+      tbl instanceof IncompleteTable);
+    long previousVersion = tbl.getCatalogVersion();
+    // do some hive operations to advance the writeIds in HMS
+    executeHiveSql("insert into " + getTestTblName() + " values (2)");
+    // get the latest validWriteIdList
+    ValidWriteIdList validWriteIdList = getValidWriteIdList(testDbName, testTblName);
+    TGetPartialCatalogObjectRequest req = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testTblName)
+      .writeId(validWriteIdList)
+      .wantFiles()
+      .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(req);
+    Assert.assertEquals(MetastoreShim.convertToTValidWriteIdList(validWriteIdList),
+      response.table_info.valid_write_ids);
+    // this should trigger a load of the table and hence the version should be higher
+    Assert.assertTrue(
+      catalog_.getTable(testDbName, testTblName).getCatalogVersion() > previousVersion);
+  }
+
+  /**
+   * Test exercises the code path when catalog has a more recent version of transactional
+   * table in its cache when compared the to the client provided ValidWriteIdList. It
+   * makes sure that the table which is loaded on the Impalad has the writeId which is
+   * consistent with the requested writeIdList of the table.
+   */
+  @Test
+  public void testCatalogAheadOfClientWriteIds() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    Table tbl = catalog_.getOrLoadTable(testDbName, testTblName, "test", null);
+    Assert.assertFalse("Table must be loaded",
+      tbl instanceof IncompleteTable);
+    ValidWriteIdList validWriteIdList = getValidWriteIdList(testDbName, testTblName);
+    // now insert into the table to advance the writeId
+    executeHiveSql("insert into " + getTestTblName() + " values (2)");
+    catalog_.invalidateTable(new TTableName(testDbName, testTblName), new Reference<>()
+      , new Reference<>());
+    Table tblAfterReload = catalog_.getOrLoadTable(testDbName, testTblName, "test", null);
+    long tblVersion = tblAfterReload.getCatalogVersion();
+    // issue a request which is older than what we have in catalog
+    TGetPartialCatalogObjectRequest req = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testTblName)
+      .writeId(validWriteIdList)
+      .wantFiles()
+      .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(req);
+    TPartialPartitionInfo partialPartitionInfo =
+      Iterables.getOnlyElement(response.table_info.partitions);
+    // since the client requested before the second file was added the number of files
+    // should be only 1
+    Assert.assertEquals(1, partialPartitionInfo.file_descriptors.size());
+    // we don't expect catalog to load the table since catalog is already ahead of client.
+    Assert.assertEquals(tblVersion,
+      catalog_.getOrLoadTable(testDbName, testTblName, "test", null).getCatalogVersion());
+  }
+
+  /**
+   * ValidWriteId support only applies for the file-metadata. If the cached
+   * ValidWriteIdList of the transactional table is ahead of requested one, it should
+   * still return all the partitions. However, the partitions which are returned should
+   * always have files which are consistent with the requested writeIDs
+   */
+  @Test
+  public void testFetchGranularityWithWriteIds() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    Table tbl = catalog_.getOrLoadTable(testDbName, testPartitionedTbl, "test", null);
+    long olderVersion = tbl.getCatalogVersion();
+    Assert.assertFalse("Table must be loaded",
+      tbl instanceof IncompleteTable);
+    ValidWriteIdList olderWriteIdList = getValidWriteIdList(testDbName,
+      testPartitionedTbl);
+    executeHiveSql("insert into " + getPartitionedTblName() + " partition (part=2) "
+      + "values (2)");
+    ValidWriteIdList currentWriteIdList = getValidWriteIdList(testDbName,
+      testPartitionedTbl);
+    // client requests olderWriteIdList which is not loaded in Catalog, this is still a
+    // cache hit scenario since catalog can satisfy what client requires without reloading
+    TGetPartialCatalogObjectRequest request = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testPartitionedTbl)
+      .writeId(olderWriteIdList)
+      .wantFiles()
+      .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    Assert.assertNotNull(
+      response.getTable_info().getPartitions().get(0).getFile_descriptors());
+    Assert.assertNotNull(
+      response.getTable_info().getPartitions().get(0).getHms_partition());
+
+    // skipping request for file-metadata should not affect the result
+    request = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testPartitionedTbl)
+      .writeId(olderWriteIdList)
+      .wantPartitionNames()
+      .build();
+    response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      Assert.assertNull(partInfo.getFile_descriptors());
+      Assert.assertNull(partInfo.getHms_partition());
+    }
+
+    // we request a newer WriteIdList now, and catalog needs to reload
+    request = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testPartitionedTbl)
+      .writeId(currentWriteIdList)
+      .wantFiles()
+      .build();
+    response = sendRequest(request);
+    Assert.assertEquals(2, response.getTable_info().getPartitionsSize());
+    // we expect both the partitions to have the file-metadata in the response
+    for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+      Assert.assertNotNull(partInfo.getFile_descriptors());
+      Assert.assertNotNull(partInfo.getHms_partition());
+    }
+    // table must be reloaded now
+    long newerVersion = catalog_.getTable(testDbName, testPartitionedTbl)
+      .getCatalogVersion();
+    Assert.assertTrue(newerVersion > olderVersion);
+
+    request = new RequestBuilder()
+      .db(testDbName)
+      .tbl(testPartitionedTbl)
+      .writeId(olderWriteIdList)
+      .wantFiles()
+      .build();
+    response = sendRequest(request);
+    // HMS metadata provides read-committed isolation level and hence it is possible
+    // that we see partitions which are from a writeId which is ahead of the requested
+    // writeId. However, we should not see files pertaining to such partitions
+    Assert.assertEquals(2, response.getTable_info().getPartitionsSize());
+    // since we requested with an older writeIdList, we expect the second partition to
+    // be empty
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+      if (partitionInfo.getName().equalsIgnoreCase("part=2")) {
+        Assert.assertTrue(partitionInfo.getFile_descriptors().isEmpty());
+      } else {
+        Assert.assertFalse(partitionInfo.getFile_descriptors().isEmpty());
+      }
+    }
+  }
+
+  private long getMetricCount(String db, String tbl, String name)
+      throws CatalogException {
+    return catalog_.getTable(db, tbl).getMetrics().getCounter(name).getCount();
+  }
+
+  /**
+   * Test makes sure that the metadata which is requested after a table has been major
+   * compacted is consistent with the validWriteId provided.
+   * @throws Exception
+   */
+  @Test
+  public void fetchAfterMajorCompaction() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    Table tbl = catalog_.getOrLoadTable(testDbName, testPartitionedTbl, "test", null);
+    Assert.assertFalse("Table must be loaded",
+        tbl instanceof IncompleteTable);
+    // row 2
+    executeHiveSql("insert into " + getPartitionedTblName() + " partition (part=1) "
+        + "values (2)");
+    ValidWriteIdList olderWriteIdList = getValidWriteIdList(testDbName,
+        testPartitionedTbl);
+    // row 3
+    executeHiveSql("insert into " + getPartitionedTblName() + " partition (part=2) "
+        + "values (2)");
+    executeHiveSql(
+        "alter table " + getPartitionedTblName()
+            + " partition(part=1) compact 'major' and wait");
+    long numMisses = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    long numHits = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    ValidWriteIdList currentWriteIdList = getValidWriteIdList(testDbName,
+        testPartitionedTbl);
+    // issue a get request at latest writeIdList to trigger a load
+    TGetPartialCatalogObjectRequest request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testPartitionedTbl)
+        .writeId(currentWriteIdList)
+        .wantFiles()
+        .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(request);
+    Assert.assertEquals(2, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+        Assert.assertEquals(1, partitionInfo.getFile_descriptors().size());
+    }
+    long numMissesAfter = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    long numHitsAfter = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    // the hit count increases by 2, one for each partition
+    Assert.assertEquals(numHits + 2, numHitsAfter);
+    Assert.assertEquals(numMisses, numMissesAfter);
+    // now issue a request with older writeId
+    request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testPartitionedTbl)
+        .writeId(olderWriteIdList)
+        .wantFiles()
+        .build();
+    response = sendRequest(request);
+    // older writeIds should see both the partitions but only one of the partitions should
+    // have file-metadata (2 files)
+    Assert.assertEquals(2, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+      if (partitionInfo.getName().equals("part=1")) {
+        Assert.assertEquals(2, partitionInfo.getFile_descriptors().size());
+      } else {
+        Assert.assertTrue(partitionInfo.getFile_descriptors().isEmpty());
+      }
+    }
+
+    numMisses = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    numHits = getMetricCount(testDbName, testPartitionedTbl,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    // hit count increases by 1 since for part=2 we can ignore all the files and there was
+    // no need to reload
+    Assert.assertEquals(numHitsAfter+1, numHits);
+    // Catalog reloads the filemetadata for one partition and hence the number of misses
+    // should be 1 higher
+    Assert.assertEquals(numMissesAfter+1, numMisses);
+    // issue a request with current writeId to make we didn't mess up the table's metadata
+    request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testPartitionedTbl)
+        .writeId(currentWriteIdList)
+        .wantFiles()
+        .build();
+    response = sendRequest(request);
+    Assert.assertEquals(2, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+      Assert.assertEquals(1, partitionInfo.getFile_descriptors().size());
+    }
+  }
+
+  /**
+   * Similar to testFetchAfterMajorCompaction but does a minor compaction instead.
+   * @throws Exception
+   */
+  @Test
+  public void testFetchAfterMinorCompaction() throws Exception {
+    Assume.assumeTrue(MetastoreShim.getMajorVersion() >= 3);
+    Table tbl = catalog_.getOrLoadTable(testDbName, testTblName, "test", null);
+    Assert.assertFalse("Table must be loaded",
+        tbl instanceof IncompleteTable);
+    // row 2, first row is in the setup method
+    executeHiveSql("insert into " + getTestTblName() + " values (2)");
+    ValidWriteIdList olderWriteIdList = getValidWriteIdList(testDbName,
+        testTblName);
+    // row 3
+    executeHiveSql("insert into " + getTestTblName() + " values (3)");
+    executeHiveSql(
+        "alter table " + getTestTblName()+ " compact 'minor' and wait");
+    ValidWriteIdList currentWriteIdList = getValidWriteIdList(testDbName,
+        testTblName);
+    long numMisses = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    long numHits = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    // issue a get request at latest writeIdList to trigger a load
+    TGetPartialCatalogObjectRequest request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testTblName)
+        .writeId(currentWriteIdList)
+        .wantFiles()
+        .build();
+    TGetPartialCatalogObjectResponse response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+      Assert.assertEquals(1, partitionInfo.getFile_descriptors().size());
+    }
+    long numMissesAfter = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    long numHitsAfter = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    // we triggered a reload of the table. We expect that filemetadata should be a cache
+    // hit
+    Assert.assertEquals(numHits + 1, numHitsAfter);
+    Assert.assertEquals(numMisses, numMissesAfter);
+    // issue a request with writeId before the minor compaction
+    request = new RequestBuilder()
+        .db(testDbName)
+        .tbl(testTblName)
+        .writeId(olderWriteIdList)
+        .wantFiles()
+        .build();
+    response = sendRequest(request);
+    Assert.assertEquals(1, response.getTable_info().getPartitionsSize());
+    for (TPartialPartitionInfo partitionInfo : response.getTable_info().getPartitions()) {
+      // we expect that catalog will load the files from FileSystem for this case so
+      // the number of delta files will be 2 (files before minor compaction)
+      Assert.assertEquals(2, partitionInfo.getFile_descriptors().size());
+    }
+    long numMisses1 = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_MISS_METRIC);
+    long numHits1 = getMetricCount(testDbName, testTblName,
+        HdfsTable.FILEMETADATA_CACHE_HIT_METRIC);
+    // we expect the miss count to increase by 1 for the only partition
+    Assert.assertEquals(numMissesAfter + 1, numMisses1 );
+    Assert.assertEquals(numHitsAfter, numHits1 );
+  }
+
+  private void executeHiveSql(String query) throws Exception {
+    try (HiveJdbcClient hiveClient = hiveClientPool_.getClient()) {
+      hiveClient.executeSql(query);
+    }
+  }
+
+  /**
+   * Simple Request builder class. Assumes all the metadata at higher granularity is
+   * required if a specific level is requested. For examples, if files are requested,
+   * assumes that partitions names and partitions are also requested.
+   */
+  private static class RequestBuilder {
+
+    boolean wantFileMetadata;
+    boolean wantPartitionMeta;
+    boolean wantPartitionNames;
+    String tblName, dbName;
+    ValidWriteIdList writeIdList;
+
+    RequestBuilder db(String db) {
+      this.dbName = db;
+      return this;
+    }
+
+    RequestBuilder tbl(String tbl) {
+      this.tblName = tbl;
+      return this;
+    }
+
+    RequestBuilder writeId(ValidWriteIdList validWriteIdList) {
+      this.writeIdList = validWriteIdList;
+      return this;
+    }
+
+    RequestBuilder wantFiles() {
+      wantFileMetadata = true;
+      wantPartitionMeta = true;
+      wantPartitionNames = true;
+      return this;
+    }
+
+    RequestBuilder wantPartitions() {
+      wantPartitionMeta = true;
+      wantFileMetadata = true;
+      return this;
+    }
+
+    RequestBuilder wantPartitionNames() {
+      wantPartitionNames = true;
+      return this;
+    }
+
+    TGetPartialCatalogObjectRequest build() {
+      TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+      req.object_desc = new TCatalogObject();
+      req.object_desc.setType(TCatalogObjectType.TABLE);
+      req.object_desc.table = new TTable(dbName, tblName);
+      req.object_desc.table.hdfs_table = new THdfsTable();
+      req.table_info_selector = new TTableInfoSelector();
+      req.table_info_selector.valid_write_ids =
+        MetastoreShim.convertToTValidWriteIdList(writeIdList);
+      req.table_info_selector.want_hms_table = true;
+      if (wantPartitionNames) {
+        req.table_info_selector.want_partition_names = true;
+      }
+      if (wantPartitionMeta) {
+        req.table_info_selector.want_partition_metadata = true;
+      }
+      if (wantFileMetadata) {
+        req.table_info_selector.want_partition_files = true;
+      }
+      return req;
+    }
+  }
+
+
+  private ValidWriteIdList getValidWriteIdList(String db, String tbl) throws TException {
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      return client.getHiveClient().getValidWriteIds(db + "." + tbl);
+    }
+  }
+
+  private TGetPartialCatalogObjectResponse sendRequest(
+    TGetPartialCatalogObjectRequest req)
+    throws CatalogException, InternalException, TException {
+    TGetPartialCatalogObjectResponse resp;
+    resp = catalog_.getPartialCatalogObject(req);
+    // Round-trip the response through serialization, so if we accidentally forgot to
+    // set the "isset" flag for any fields, we'll catch that bug.
+    byte[] respBytes = new TSerializer().serialize(resp);
+    resp.clear();
+    new TDeserializer().deserialize(resp, respBytes);
+    return resp;
+  }
+
+  private static String getTestTblName() {
+    return testDbName + "." + testTblName;
+  }
+
+  private static String getPartitionedTblName() {
+    return testDbName + "." + testPartitionedTbl;
+  }
+
+  private void invalidateTbl(String db, String tbl) throws CatalogException {
+    catalog_.invalidateTable(new TTableName(db, tbl), new Reference<>(),
+      new Reference<>());
+    Assert.assertTrue("Table must not be loaded",
+      catalog_.getTable(db, tbl) instanceof IncompleteTable);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index ba4a0b5..fab7196 100644
--- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -138,7 +138,6 @@ import org.apache.impala.thrift.TTypeNode;
 import org.apache.impala.thrift.TTypeNodeType;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
-import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
 import org.apache.thrift.TException;
 import org.junit.After;
@@ -839,7 +838,7 @@ public class MetastoreEventsProcessorTest {
     eventsProcessor_.processEvents();
 
     // Simulate a load table
-    Table tbl = catalog_.getOrLoadTable(dbName, tblName, "test");
+    Table tbl = catalog_.getOrLoadTable(dbName, tblName, "test", null);
     Partition partition = null;
     if (isPartitionInsert) {
       // Get the partition from metastore. This should now contain the new file.
@@ -3025,7 +3024,7 @@ public class MetastoreEventsProcessorTest {
   }
 
   private Table loadTable(String dbName, String tblName) throws CatalogException {
-    Table loadedTable = catalog_.getOrLoadTable(dbName, tblName, "test");
+    Table loadedTable = catalog_.getOrLoadTable(dbName, tblName, "test", null);
     assertFalse("Table should have been loaded after getOrLoadTable call",
         loadedTable instanceof IncompleteTable);
     return loadedTable;
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
index 7cd076c..e310fc2 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpalaJdbcClient.java
@@ -120,6 +120,12 @@ public class ImpalaJdbcClient {
     return stmt_.executeQuery(query);
   }
 
+  public void execStatement(String query) throws SQLException {
+    validateConnection();
+    LOG.info("Executing statement: " + query);
+    stmt_.execute(query);
+  }
+
   public void changeDatabase(String db_name) throws SQLException {
     validateConnection();
     LOG.info("Using: " + db_name);
diff --git a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
index 336fc88..57c882f 100644
--- a/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/ImpaladTestCatalog.java
@@ -125,7 +125,7 @@ public class ImpaladTestCatalog extends ImpaladCatalog {
     // The table was not yet loaded. Load it in to the catalog now.
     Table newTbl = null;
     try {
-      newTbl = srcCatalog_.getOrLoadTable(dbName, tblName, "test");
+      newTbl = srcCatalog_.getOrLoadTable(dbName, tblName, "test", null);
     } catch (CatalogException e) {
       throw new IllegalStateException("Unexpected table loading failure.", e);
     }
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index d75ee64..d4e9748 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.compat.MetastoreShim;
 import org.hamcrest.Matchers;
 import org.junit.Assume;
@@ -73,7 +74,7 @@ public class AcidUtilsTest {
       assertThat(AcidUtils.filterFilesForAcidState(stats, BASE_PATH,
           new ValidReadTxnList(validTxnListStr), writeIds, null),
           Matchers.containsInAnyOrder(expectedStats.toArray()));
-    } catch (MetaException me) {
+    } catch (CatalogException me) {
       //TODO: Remove try-catch once IMPALA-9042 is resolved.
       assertTrue(false);
     }
diff --git a/shaded-deps/pom.xml b/shaded-deps/pom.xml
index 851403d..261e30d 100644
--- a/shaded-deps/pom.xml
+++ b/shaded-deps/pom.xml
@@ -75,6 +75,7 @@ the same dependencies
               <includes>
                 <include>org/apache/hadoop/hive/conf/**/*</include>
                 <include>org/apache/hadoop/hive/common/FileUtils*</include>
+                <include>org/apache/hive/common/util/TxnIdUtils*</include>
                 <!-- Needed to support describe formatted command compat with Hive --> 
                 <include>org/apache/hadoop/hive/ql/metadata/**/*</include>
                 <include>org/apache/hadoop/hive/ql/parse/SemanticException.class</include>