You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/08/21 16:39:50 UTC

[4/4] impala git commit: IMPALA-7437. LRU caching of partitions in impalad

IMPALA-7437. LRU caching of partitions in impalad

This changes the CatalogdMetaProvider to use a Guava-based LRU cache.
The eviction strategy is currently time-based (1 hour), and it only
performs caching of some basic items like partition information, the
null-partition-key-value, and table column statistics. It does not
cache the table entries themselves, which means that we don't need to do
any invalidation propagation via the statestore quite yet. Instead,
every query will do an initial fetch of the table metadata in order to
know the current version number. That version number is then used as
part of the cache key for all further metadata, so when the version
number changes, all of the prior cache entries become "unreachable" and
effectively evicted.

Initially, I attempted to implement this by adding a new MetaProvider
implementation that would transparently wrap another MetaProvider
implementation (either catalogd-based or direct-from-source). However, I
found that I wanted to use catalogd-based implementation details like
the version number in the cache key, and trying to abstract this behind
an interface wasn't very clear. So, I elected to just embed the caching
logic into the CatalogdMetaProvider itself.

Note that this patch upgrades the Guava reference in the pom from 11.0.2
to 14.0.1. In fact, I found that Guava 14.0.1 was already leaking onto
the classpath by being included in hive-exec.jar, so it was ending up
picking one or the other in a somewhat unpredictable fashion. The
CacheBuilder class had a small API change between v11 and v14 so I
needed to ensure a specific version so that Eclipse and Maven agreed on
which version to build against.

This includes some basic unit testing and I also verified that some
query tests like TPCH pass.

Change-Id: I9a57521ad851da605604a1e7c48d3d6627da5df5
Reviewed-on: http://gerrit.cloudera.org:8080/11208
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3fa05604
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3fa05604
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3fa05604

Branch: refs/heads/master
Commit: 3fa05604aca2d8f65b3ded4950df8f38fffe43d5
Parents: ef15da0
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Aug 13 17:25:34 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 21 16:17:03 2018 +0000

----------------------------------------------------------------------
 .../catalog/local/CatalogdMetaProvider.java     | 419 +++++++++++++++++--
 .../impala/catalog/local/LocalCatalog.java      |   4 +-
 .../impala/catalog/local/LocalFsTable.java      |   3 +
 .../impala/catalog/local/MetaProvider.java      |   5 +
 .../catalog/local/CatalogdMetaProviderTest.java | 141 +++++++
 impala-parent/pom.xml                           |   4 +-
 6 files changed, 538 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index ef71e63..4d2aa6b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -17,10 +17,14 @@
 
 package org.apache.impala.catalog.local;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -32,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
@@ -50,8 +55,15 @@ import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -59,14 +71,68 @@ import com.google.errorprone.annotations.Immutable;
 
 /**
  * MetaProvider which fetches metadata in a granular fashion from the catalogd.
+ *
+ * Automatically partially caches metadata to avoid fetching too much data from the
+ * catalogd on every query.
+ *
+ * TODO(todd): expose statistics on a per-query and per-daemon level about cache
+ * hit rates, number of outbound RPCs, etc.
+ * TODO(todd): handle retry/backoff to ride over short catalog interruptions
  */
 public class CatalogdMetaProvider implements MetaProvider {
 
+  private final static Logger LOG = LoggerFactory.getLogger(CatalogdMetaProvider.class);
+
+  /**
+   * Sentinel value used as a negative cache entry for column statistics.
+   * Some columns (e.g. partitioning columns )do not have statistics in the catalog
+   * and won't be returned when we ask it for stats. It's important to cache negative
+   * entries for those or else we would require a round-trip every time the table
+   * is loaded.
+   *
+   * This special sentinel value is stored in the cache to indicate such a "negative
+   * cache" entry. It is always compared by reference equality.
+   */
+  private static final ColumnStatisticsObj NEGATIVE_COLUMN_STATS_SENTINEL =
+      new ColumnStatisticsObj();
+
+  /**
+   * Used as a cache key for caching the "null partition key value", which is a global
+   * Hive configuration.
+   */
+  private static final Object NULL_PARTITION_KEY_VALUE_CACHE_KEY = new Object();
+
+  /**
+   * File descriptors store replicas using a compressed format that references hosts
+   * by index in a "host index" list rather than by their full addresses. Since we cache
+   * partition metadata including file descriptors across many queries, we can't rely on
+   * callers to provide a consistent host index. Instead, cached file descriptors are
+   * always relative to this global host index.
+   *
+   * Note that we never evict entries from this host index. We rely on the fact that,
+   * in a given storage cluster, the number of hosts is bounded, and "leaking" the unique
+   * network addresses won't cause a problem over time.
+   */
+  private final ListMap<TNetworkAddress> cacheHostIndex_ =
+      new ListMap<TNetworkAddress>();
+
   // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
   // of metadata. In order to incrementally build this out, we delegate various calls
   // to the "direct" provider for now and circumvent catalogd.
   private DirectMetaProvider directProvider_ = new DirectMetaProvider();
 
+  // TODO(todd): hard-coded TTL is not the final solution here. We should implement
+  // memory estimation for all cached objects, and evict based on a configurable
+  // memory pressure.
+  final Cache<Object,Object> cache_ = CacheBuilder.newBuilder()
+      .expireAfterAccess(1, TimeUnit.HOURS)
+      .recordStats()
+      .build();
+
+  public CacheStats getCacheStats() {
+    return cache_.stats();
+  }
+
   /**
    * Send a GetPartialCatalogObject request to catalogd. This handles converting
    * non-OK status responses back to exceptions, performing various generic sanity
@@ -191,32 +257,88 @@ public class CatalogdMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(final TableMetaRef table,
       List<String> colNames) throws TException {
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.want_stats_for_column_names = colNames;
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    checkResponse(resp.table_info != null && resp.table_info.column_stats != null,
-        req, "missing column stats");
-    return resp.table_info.column_stats;
+    List<ColumnStatisticsObj> ret = Lists.newArrayListWithCapacity(colNames.size());
+
+    // Look up in cache first, keeping track of which ones are missing.
+    int negativeHitCount = 0;
+    List<String> missingCols = Lists.newArrayListWithCapacity(colNames.size());
+    for (String colName: colNames) {
+      ColStatsCacheKey cacheKey = new ColStatsCacheKey((TableMetaRefImpl)table, colName);
+      ColumnStatisticsObj val = (ColumnStatisticsObj)cache_.getIfPresent(cacheKey);
+      if (val == null) {
+        missingCols.add(colName);
+      } else if (val == NEGATIVE_COLUMN_STATS_SENTINEL) {
+        negativeHitCount++;
+      } else {
+        ret.add(val);
+      }
+    }
+    int hitCount = ret.size();
+
+    // Fetch and re-add those missing ones.
+    if (!missingCols.isEmpty()) {
+      TGetPartialCatalogObjectRequest req = newReqForTable(table);
+      req.table_info_selector.want_stats_for_column_names = missingCols;
+      TGetPartialCatalogObjectResponse resp = sendRequest(req);
+      checkResponse(resp.table_info != null && resp.table_info.column_stats != null,
+          req, "missing column stats");
+
+      Set<String> colsWithoutStats = new HashSet<>(missingCols);
+      for (ColumnStatisticsObj stats: resp.table_info.column_stats) {
+        cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, stats.getColName()),
+            stats);
+        ret.add(stats);
+        colsWithoutStats.remove(stats.getColName());
+      }
+
+      // Cache negative entries for any that were not returned.
+      for (String missingColName: colsWithoutStats) {
+        cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, missingColName),
+            NEGATIVE_COLUMN_STATS_SENTINEL);
+      }
+    }
+    LOG.trace("Request for column stats of {}: hit {}/ neg hit {} / miss {}",
+        table, hitCount, negativeHitCount, missingCols.size());
+    return ret;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public List<PartitionRef> loadPartitionList(TableMetaRef table)
-      throws MetaException, TException {
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.want_partition_names = true;
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    checkResponse(resp.table_info != null && resp.table_info.partitions != null,
-        req, "missing partition list result");
-    List<PartitionRef> ret = Lists.newArrayListWithCapacity(
-        resp.table_info.partitions.size());
-    for (TPartialPartitionInfo p : resp.table_info.partitions) {
-      checkResponse(p.isSetId(), req, "response missing partition IDs for partition %s",
-          p);
-      ret.add(new PartitionRefImpl(p));
+  public List<PartitionRef> loadPartitionList(final TableMetaRef table)
+      throws TException {
+    final Reference<Boolean> hitCache = new Reference<>(true);
+    try {
+      PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl)table);
+      return (List<PartitionRef>) cache_.get(key,
+          new Callable<List<PartitionRef>>() {
+            /** Called to load cache for cache misses */
+            @Override
+            public List<PartitionRef> call() throws Exception {
+              hitCache.setRef(false);
+              TGetPartialCatalogObjectRequest req = newReqForTable(table);
+              req.table_info_selector.want_partition_names = true;
+              TGetPartialCatalogObjectResponse resp = sendRequest(req);
+              checkResponse(resp.table_info != null && resp.table_info.partitions != null,
+                  req, "missing partition list result");
+              List<PartitionRef> partitionRefs = Lists.newArrayListWithCapacity(
+                  resp.table_info.partitions.size());
+              for (TPartialPartitionInfo p: resp.table_info.partitions) {
+                checkResponse(p.isSetId(), req,
+                    "response missing partition IDs for partition %s", p);
+                partitionRefs.add(new PartitionRefImpl(p));
+              }
+              return partitionRefs;
+            }
+          });
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
+      throw new RuntimeException(e);
+    } finally {
+      LOG.trace("Request for partition list of {}: {}", table,
+          hitCache.getRef() ? "hit":"miss");
     }
-    return ret;
   }
 
   @Override
@@ -228,9 +350,45 @@ public class CatalogdMetaProvider implements MetaProvider {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
 
-    List<Long> ids = Lists.newArrayListWithExpectedSize(partitionRefs.size());
+    // Load what we can from the cache.
+    Map<PartitionRef, PartitionMetadata> refToMeta = loadPartitionsFromCache(refImpl,
+        hostIndex, partitionRefs);
+
+    LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(),
+        partitionRefs.size());
+
+    // Load the remainder from the catalogd.
+    List<PartitionRef> missingRefs = Lists.newArrayList();
     for (PartitionRef ref: partitionRefs) {
-      ids.add(((PartitionRefImpl)ref).getId());
+      if (!refToMeta.containsKey(ref)) missingRefs.add(ref);
+    }
+    if (!missingRefs.isEmpty()) {
+      Map<PartitionRef, PartitionMetadata> fromCatalogd = loadPartitionsFromCatalogd(
+          refImpl, hostIndex, missingRefs);
+      refToMeta.putAll(fromCatalogd);
+      // Write back to the cache.
+      storePartitionsInCache(refImpl, hostIndex, fromCatalogd);
+    }
+
+    // Convert the returned map to be by-name instead of by-ref.
+    Map<String, PartitionMetadata> nameToMeta = Maps.newHashMapWithExpectedSize(
+        refToMeta.size());
+    for (Map.Entry<PartitionRef, PartitionMetadata> e: refToMeta.entrySet()) {
+      nameToMeta.put(e.getKey().getName(), e.getValue());
+    }
+    return nameToMeta;
+  }
+
+  /**
+   * Load the specified partitions 'prefs' from catalogd. The partitions are made
+   * relative to the given 'hostIndex' before being returned.
+   */
+  private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCatalogd(
+      TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partRefs) throws TException {
+    List<Long> ids = Lists.newArrayListWithCapacity(partRefs.size());
+    for (PartitionRef partRef: partRefs) {
+      ids.add(((PartitionRefImpl)partRef).getId());
     }
 
     TGetPartialCatalogObjectRequest req = newReqForTable(table);
@@ -246,19 +404,19 @@ public class CatalogdMetaProvider implements MetaProvider {
         req, "returned %d partitions instead of expected %d",
         resp.table_info.partitions.size(), ids.size());
 
-    Map<String, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(ids.size());
-    for (TPartialPartitionInfo part: resp.table_info.partitions) {
+    Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMap();
+    for (int i = 0; i < ids.size(); i++) {
+      PartitionRef partRef = partRefs.get(i);
+      TPartialPartitionInfo part = resp.table_info.partitions.get(i);
       Partition msPart = part.getHms_partition();
       if (msPart == null) {
-        checkResponse(refImpl.msTable_.getPartitionKeysSize() == 0, req,
+        checkResponse(table.msTable_.getPartitionKeysSize() == 0, req,
             "Should not return a partition with missing HMS partition unless " +
             "the table is unpartitioned");
-        msPart = DirectMetaProvider.msTableToPartition(refImpl.msTable_);
+        msPart = DirectMetaProvider.msTableToPartition(table.msTable_);
       }
-      checkResponse(msPart != null && msPart.getValues() != null, req,
-          "malformed partition result: %s", part.toString());
-      String partName = FileUtils.makePartName(partitionColumnNames, msPart.getValues());
 
+      // Transform the file descriptors to the caller's index.
       checkResponse(part.file_descriptors != null, req, "missing file descriptors");
       List<FileDescriptor> fds = Lists.newArrayListWithCapacity(
           part.file_descriptors.size());
@@ -273,15 +431,61 @@ public class CatalogdMetaProvider implements MetaProvider {
       }
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
           ImmutableList.copyOf(fds));
-      PartitionMetadata oldVal = ret.put(partName, metaImpl);
+
+      checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
+
+      PartitionMetadata oldVal = ret.put(partRef, metaImpl);
       if (oldVal != null) {
-        throw new RuntimeException("catalogd returned partition " + partName +
+        throw new RuntimeException("catalogd returned partition " + part.id +
             " multiple times");
       }
     }
     return ret;
   }
 
+  /**
+   * Load all partitions from 'partitionRefs' that are currently present in the cache.
+   * Any partitions that miss the cache are left unset in the resulting map.
+   *
+   * The FileDescriptors of the resulting partitions are copied and made relative to
+   * the provided hostIndex.
+   */
+  private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCache(
+      TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs) {
+
+    Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(
+        partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      PartitionRefImpl prefImpl = (PartitionRefImpl)ref;
+      PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId());
+      PartitionMetadataImpl val = (PartitionMetadataImpl)cache_.getIfPresent(cacheKey);
+      if (val == null) continue;
+
+      // The entry in the cache has file descriptors that are relative to the cache's
+      // host index, rather than the caller's host index. So, we need to transform them.
+      ret.put(ref, val.cloneRelativeToHostIndex(cacheHostIndex_, hostIndex));
+    }
+    return ret;
+  }
+
+
+  /**
+   * Write back the partitions in 'metas' into the cache. The file descriptors in these
+   * partitions must be relative to the 'hostIndex'.
+   */
+  private void storePartitionsInCache(TableMetaRefImpl table,
+      ListMap<TNetworkAddress> hostIndex, Map<PartitionRef, PartitionMetadata> metas) {
+    for (Map.Entry<PartitionRef, PartitionMetadata> e: metas.entrySet()) {
+      PartitionRefImpl prefImpl = (PartitionRefImpl)e.getKey();
+      PartitionMetadataImpl metaImpl = (PartitionMetadataImpl)e.getValue();
+      PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId());
+      PartitionMetadataImpl cacheVal = metaImpl.cloneRelativeToHostIndex(hostIndex,
+          cacheHostIndex_);
+      cache_.put(cacheKey, cacheVal);
+    }
+  }
+
   private static void checkResponse(boolean condition,
       TGetPartialCatalogObjectRequest req, String msg, Object... args) throws TException {
     if (condition) return;
@@ -291,7 +495,19 @@ public class CatalogdMetaProvider implements MetaProvider {
 
   @Override
   public String loadNullPartitionKeyValue() throws MetaException, TException {
-    return directProvider_.loadNullPartitionKeyValue();
+    try {
+      return (String) cache_.get(NULL_PARTITION_KEY_VALUE_CACHE_KEY,
+          new Callable<String>() {
+            /** Called to load cache for cache misses */
+            @Override
+            public String call() throws Exception {
+              return directProvider_.loadNullPartitionKeyValue();
+            }
+          });
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -317,7 +533,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     private final TPartialPartitionInfo info_;
 
     public PartitionRefImpl(TPartialPartitionInfo p) {
-      this.info_ = p;
+      this.info_ = Preconditions.checkNotNull(p);
     }
 
     @Override
@@ -336,10 +552,24 @@ public class CatalogdMetaProvider implements MetaProvider {
 
     public PartitionMetadataImpl(Partition msPartition,
         ImmutableList<FileDescriptor> fds) {
-      this.msPartition_ = msPartition;
+      this.msPartition_ = Preconditions.checkNotNull(msPartition);
       this.fds_ = fds;
     }
 
+    /**
+     * Clone this metadata object, but make it relative to 'dstIndex' instead of
+     * 'origIndex'.
+     */
+    public PartitionMetadataImpl cloneRelativeToHostIndex(
+        ListMap<TNetworkAddress> origIndex,
+        ListMap<TNetworkAddress> dstIndex) {
+      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(fds_.size());
+      for (FileDescriptor fd: fds_) {
+        fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
+      }
+      return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds));
+    }
+
     @Override
     public Partition getHmsPartition() {
       return msPartition_;
@@ -381,5 +611,122 @@ public class CatalogdMetaProvider implements MetaProvider {
       this.msTable_ = msTable;
       this.catalogVersion_ = catalogVersion;
     }
+
+    @Override
+    public String toString() {
+      return String.format("TableMetaRef %s.%s@%d", dbName_, tableName_, catalogVersion_);
+    }
+  }
+
+  /**
+   * Base class for cache keys related to a specific table.
+   */
+  private static class TableCacheKey {
+    final String dbName_;
+    final String tableName_;
+    /**
+     * The catalog version number of the Table object. Including the version number in
+     * the cache key ensures that, if the version number changes, any dependent entities
+     * are "automatically" invalidated.
+     *
+     * TODO(todd): need to handle the case of a catalogd restarting and reloading metadata
+     * for a table reusing a previously-used version number.
+     */
+    final long version_;
+
+    TableCacheKey(TableMetaRefImpl table) {
+      dbName_ = table.dbName_;
+      tableName_ = table.tableName_;
+      version_ = table.catalogVersion_;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(dbName_, tableName_, version_, getClass());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !obj.getClass().equals(getClass())) {
+        return false;
+      }
+      TableCacheKey other = (TableCacheKey)obj;
+      return version_ == other.version_ &&
+          tableName_.equals(other.tableName_) &&
+          dbName_.equals(other.dbName_);
+    }
+  }
+
+  /**
+   * Cache key for an entry storing column statistics.
+   *
+   * Values for these keys are 'ColumnStatisticsObj' objects.
+   */
+  private static class ColStatsCacheKey extends TableCacheKey {
+    private final String colName_;
+
+    public ColStatsCacheKey(TableMetaRefImpl table, String colName) {
+      super(table);
+      colName_ = colName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(super.hashCode(), colName_);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof ColStatsCacheKey)) {
+        return false;
+      }
+      ColStatsCacheKey other = (ColStatsCacheKey)obj;
+      return super.equals(obj) && colName_.equals(other.colName_);
+    }
+  }
+
+  /**
+   * Cache key for the partition list of a table.
+   *
+   * Values for these keys are 'List<PartitionRefImpl>'.
+   */
+  private static class PartitionListCacheKey extends TableCacheKey {
+    PartitionListCacheKey(TableMetaRefImpl table) {
+      super(table);
+    }
+  }
+
+  /**
+   * Key for caching information about a single partition.
+   *
+   * TODO(todd): currently this inherits from TableCacheKey. This means that, if a
+   * table's version number changes, all of its partitions must be reloaded. However,
+   * since partition IDs are globally unique within a catalogd instance, we could
+   * optimize this to just key based on the partition ID. However, currently, there are
+   * some cases where partitions are mutated in place rather than replaced with a new ID.
+   * We need to eliminate those or add a partition sequence number before we can make
+   * this optimization.
+   */
+  private static class PartitionCacheKey extends TableCacheKey {
+    private final long partId_;
+
+    PartitionCacheKey(TableMetaRefImpl table, long partId) {
+      super(table);
+      partId_ = partId;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(super.hashCode(), partId_);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof PartitionCacheKey)) {
+        return false;
+      }
+      PartitionCacheKey other = (PartitionCacheKey)obj;
+      return super.equals(obj) && partId_ == other.partId_;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 8d57210..89f5345 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -71,8 +71,10 @@ public class LocalCatalog implements FeCatalog {
   private String nullPartitionKeyValue_;
   private final String defaultKuduMasterHosts_;
 
+  private static MetaProvider PROVIDER = new CatalogdMetaProvider();
+
   public static LocalCatalog create(String defaultKuduMasterHosts) {
-    return new LocalCatalog(new CatalogdMetaProvider(), defaultKuduMasterHosts);
+    return new LocalCatalog(PROVIDER, defaultKuduMasterHosts);
   }
 
   private LocalCatalog(MetaProvider metaProvider, String defaultKuduMasterHosts) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index da784d6..10021e0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -372,6 +372,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   @Override
   public List<? extends FeFsPartition> loadPartitions(Collection<Long> ids) {
+    // TODO(todd) it seems like some queries actually call this multiple times.
+    // Perhaps we should store the result in this class, instead of relying on
+    // catalog-layer caching?
     Preconditions.checkState(partitionSpecs_ != null,
         "Cannot load partitions without having fetched partition IDs " +
         "from the same LocalFsTable instance");

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 0a217da..72e8f03 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -60,6 +60,11 @@ interface MetaProvider {
   String loadNullPartitionKeyValue()
       throws MetaException, TException;
 
+  /**
+   * Load the list of partitions for the given table. Each returned partition
+   * acts as a reference which can later be passed to 'loadPartitionsByRefs' in order
+   * to fetch more detailed metadata (e.g. after partition pruning has completed).
+   */
   List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
new file mode 100644
index 0000000..f7c3abd
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.common.Pair;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableList;
+
+public class CatalogdMetaProviderTest {
+
+  private final static Logger LOG = LoggerFactory.getLogger(
+      CatalogdMetaProviderTest.class);
+
+  private final CatalogdMetaProvider provider_;
+  private final TableMetaRef tableRef_;
+
+  private CacheStats prevStats_;
+
+  static {
+    FeSupport.loadLibrary();
+  }
+
+  public CatalogdMetaProviderTest() throws Exception {
+    provider_ = new CatalogdMetaProvider();
+    Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", "alltypes");
+    tableRef_ = tablePair.second;
+    prevStats_ = provider_.getCacheStats();
+  }
+
+  private CacheStats diffStats() {
+    CacheStats s = provider_.getCacheStats();
+    CacheStats diff = s.minus(prevStats_);
+    prevStats_ = s;
+    LOG.info("Stats: {}", diff);
+    return diff;
+  }
+
+  @Test
+  public void testCachePartitionList() throws Exception {
+    List<PartitionRef> partList = provider_.loadPartitionList(tableRef_);
+    CacheStats stats = diffStats();
+    assertEquals(1, stats.requestCount());
+    assertEquals(1, stats.loadCount());
+    assertEquals(0, stats.hitCount());
+
+    List<PartitionRef> partListHit = provider_.loadPartitionList(tableRef_);
+    stats = diffStats();
+    assertEquals(1, stats.requestCount());
+    assertEquals(1, stats.hitCount());
+
+    // Results should be the same.
+    assertEquals(partList, partListHit);
+  }
+
+  @Test
+  public void testCachePartitionsByRef() throws Exception {
+    List<PartitionRef> allRefs = provider_.loadPartitionList(tableRef_);
+    List<PartitionRef> partialRefs = allRefs.subList(3, 8);
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    CacheStats stats = diffStats();
+
+    // Should get no hits on the initial load of partitions.
+    Map<String, PartitionMetadata> partMap = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        partialRefs);
+    assertEquals(partialRefs.size(), partMap.size());
+    stats = diffStats();
+    assertEquals(0, stats.hitCount());
+
+    // Load the same partitions again and we should get a hit for each partition.
+    Map<String, PartitionMetadata> partMapHit = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        partialRefs);
+    stats = diffStats();
+    assertEquals(stats.hitCount(), partMapHit.size());
+
+    // Load all of the partitions: we should get some hits and some misses.
+    Map<String, PartitionMetadata> allParts = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        allRefs);
+    assertEquals(allRefs.size(), allParts.size());
+    stats = diffStats();
+    assertEquals(stats.hitCount(), partMapHit.size());
+  }
+
+  @Test
+  public void testCacheColumnStats() throws Exception {
+    ImmutableList<String> colNames = ImmutableList.of("month", "id");
+    List<ColumnStatisticsObj> colStats = provider_.loadTableColumnStatistics(tableRef_,
+        colNames);
+    // Only 'id' has stats -- 'month' is a partition column and therefore has none.
+    assertEquals(1, colStats.size());
+    CacheStats stats = diffStats();
+    // We should have missed on both columns.
+    assertEquals(2, stats.requestCount());
+    assertEquals(2, stats.missCount());
+
+    // Look up again, and we should get the same results.
+    List<ColumnStatisticsObj> colStats2 = provider_.loadTableColumnStatistics(tableRef_,
+        colNames);
+    assertEquals(colStats, colStats2);
+
+    // Should have gotten hits on both columns (one positive, one negative).
+    stats = diffStats();
+    assertEquals(2, stats.requestCount());
+    assertEquals(2, stats.hitCount());
+    assertEquals(0, stats.missCount());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/impala-parent/pom.xml
----------------------------------------------------------------------
diff --git a/impala-parent/pom.xml b/impala-parent/pom.xml
index 92233f4..dbdf165 100644
--- a/impala-parent/pom.xml
+++ b/impala-parent/pom.xml
@@ -54,7 +54,9 @@ under the License.
     <httpcomponents.core.version>4.4.9</httpcomponents.core.version>
     <yarn-extras.version>${project.version}</yarn-extras.version>
     <eclipse.output.directory>eclipse-classes</eclipse.output.directory>
-    <guava.version>11.0.2</guava.version>
+    <!-- hive-exec seems to leak this version of guava onto our classpath,
+         so it's important to depend on the same one -->
+    <guava.version>14.0.1</guava.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   </properties>