You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/08/21 16:39:47 UTC

[1/4] impala git commit: IMPALA-7422: Fix a race in QueryState::StartFInstances()

Repository: impala
Updated Branches:
  refs/heads/master d29300281 -> 3fa05604a


IMPALA-7422: Fix a race in QueryState::StartFInstances()

A recent commit for IMPALA-7163 (cbc8c63) introduced a race between
insertion into QueryState::fragment_map_ and the thread creation.
In particular, after the aforementioned commit, the counting barrier
'instances_prepared_barrier_' is used for synchronizing callers of
Cancel()/PublishFilter() and the PREPARE phase of fragment instances.
Cancel()/PublishFilter() cannot proceed until all fragment instances
have finished preparing; 'instances_prepared_barrier_' is updated by
fragment instances once each of them is done preparing.

The race is due to the fact that QueryState::StartFInstances() doesn't
insert the fragment instance into 'fragment_map_' until after the fragment
instance thread has been spawned. So, it's possible for the newly spawned
thread to finish preparing and update the counting barrier before the insertion
into 'fragment_map_' happens. It's therefore possible for PublishFilter() to
have gotten unblocked before a fragment is inserted into 'fragment_map_',
triggering the DCHECK() in IMPALA-7422.

This change fixes the race by moving the insertion into fragment_map_
before the thread is spawned.

Testing done: Exhaustive debug + release builds which previously ran into this race

Change-Id: I35f2a5b0ea5143703850ffc229cec0e4294e6a3e
Reviewed-on: http://gerrit.cloudera.org:8080/11270
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8360277e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8360277e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8360277e

Branch: refs/heads/master
Commit: 8360277eaf6ab9743f253105dbaa25ecdfd6c5e7
Parents: d293002
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Aug 20 00:43:22 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Aug 21 09:44:20 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/query-state.cc | 12 ++++++++----
 be/src/runtime/query-state.h  | 10 ++++++----
 2 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4b8924a..329f757 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -396,9 +396,15 @@ void QueryState::StartFInstances() {
     refcnt_.Add(1); // decremented in ExecFInstance()
     AcquireExecResourceRefcount(); // decremented in ExecFInstance()
 
-    // Add the fragment instance ID to the 'fis_map_'.
+    // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread is
+    // spawned or we may race with users of 'fis_map_'.
     fis_map_.emplace(fis->instance_id(), fis);
 
+    // Update fragment_map_. Has to happen before the thread is spawned below or
+    // we may race with users of 'fragment_map_'.
+    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
+    fis_list.push_back(fis);
+
     string thread_name = Substitute("$0 (finst:$1)",
         FragmentInstanceState::FINST_THREAD_NAME_PREFIX,
         PrintId(instance_ctx.fragment_instance_id));
@@ -412,6 +418,7 @@ void QueryState::StartFInstances() {
         debug_action_status;
     if (!thread_create_status.ok()) {
       fis_map_.erase(fis->instance_id());
+      fis_list.pop_back();
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
@@ -419,9 +426,6 @@ void QueryState::StartFInstances() {
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
-    // update fragment_map_
-    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
-    fis_list.push_back(fis);
     t->Detach();
     --num_unstarted_instances;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/8360277e/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 49fd8eb..148b06f 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -29,7 +29,6 @@
 #include "gen-cpp/Types_types.h"
 #include "runtime/tmp-file-mgr.h"
 #include "util/counting-barrier.h"
-#include "util/promise.h"
 #include "util/uid-util.h"
 
 namespace impala {
@@ -341,12 +340,15 @@ class QueryState {
   std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
   /// map from instance id to its state (owned by obj_pool_), populated in
-  /// StartFInstances(); not valid to read from until instances_prepare_promise_
-  /// is set
+  /// StartFInstances(); Not valid to read from until 'instances_prepared_barrier_'
+  /// is set (i.e. readers should always call WaitForPrepare()).
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
 
   /// map from fragment index to its instances (owned by obj_pool_), populated in
-  /// StartFInstances()
+  /// StartFInstances(). Only written by the query state thread (i.e. the thread
+  /// which executes StartFInstances()). Not valid to read from until
+  /// 'instances_prepared_barrier_' is set (i.e. accessor should always call
+  /// WaitForPrepare()).
   std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_;
 
   ObjectPool obj_pool_;


[4/4] impala git commit: IMPALA-7437. LRU caching of partitions in impalad

Posted by jo...@apache.org.
IMPALA-7437. LRU caching of partitions in impalad

This changes the CatalogdMetaProvider to use a Guava-based LRU cache.
The eviction strategy is currently time-based (1 hour), and it only
performs caching of some basic items like partition information, the
null-partition-key-value, and table column statistics. It does not
cache the table entries themselves, which means that we don't need to do
any invalidation propagation via the statestore quite yet. Instead,
every query will do an initial fetch of the table metadata in order to
know the current version number. That version number is then used as
part of the cache key for all further metadata, so when the version
number changes, all of the prior cache entries become "unreachable" and
effectively evicted.

Initially, I attempted to implement this by adding a new MetaProvider
implementation that would transparently wrap another MetaProvider
implementation (either catalogd-based or direct-from-source). However, I
found that I wanted to use catalogd-based implementation details like
the version number in the cache key, and trying to abstract this behind
an interface wasn't very clear. So, I elected to just embed the caching
logic into the CatalogdMetaProvider itself.

Note that this patch upgrades the Guava reference in the pom from 11.0.2
to 14.0.1. In fact, I found that Guava 14.0.1 was already leaking onto
the classpath by being included in hive-exec.jar, so it was ending up
picking one or the other in a somewhat unpredictable fashion. The
CacheBuilder class had a small API change between v11 and v14 so I
needed to ensure a specific version so that Eclipse and Maven agreed on
which version to build against.

This includes some basic unit testing and I also verified that some
query tests like TPCH pass.

Change-Id: I9a57521ad851da605604a1e7c48d3d6627da5df5
Reviewed-on: http://gerrit.cloudera.org:8080/11208
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3fa05604
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3fa05604
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3fa05604

Branch: refs/heads/master
Commit: 3fa05604aca2d8f65b3ded4950df8f38fffe43d5
Parents: ef15da0
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Aug 13 17:25:34 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 21 16:17:03 2018 +0000

----------------------------------------------------------------------
 .../catalog/local/CatalogdMetaProvider.java     | 419 +++++++++++++++++--
 .../impala/catalog/local/LocalCatalog.java      |   4 +-
 .../impala/catalog/local/LocalFsTable.java      |   3 +
 .../impala/catalog/local/MetaProvider.java      |   5 +
 .../catalog/local/CatalogdMetaProviderTest.java | 141 +++++++
 impala-parent/pom.xml                           |   4 +-
 6 files changed, 538 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index ef71e63..4d2aa6b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -17,10 +17,14 @@
 
 package org.apache.impala.catalog.local;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -32,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
@@ -50,8 +55,15 @@ import org.apache.impala.util.ListMap;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheStats;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -59,14 +71,68 @@ import com.google.errorprone.annotations.Immutable;
 
 /**
  * MetaProvider which fetches metadata in a granular fashion from the catalogd.
+ *
+ * Automatically partially caches metadata to avoid fetching too much data from the
+ * catalogd on every query.
+ *
+ * TODO(todd): expose statistics on a per-query and per-daemon level about cache
+ * hit rates, number of outbound RPCs, etc.
+ * TODO(todd): handle retry/backoff to ride over short catalog interruptions
  */
 public class CatalogdMetaProvider implements MetaProvider {
 
+  private final static Logger LOG = LoggerFactory.getLogger(CatalogdMetaProvider.class);
+
+  /**
+   * Sentinel value used as a negative cache entry for column statistics.
+   * Some columns (e.g. partitioning columns )do not have statistics in the catalog
+   * and won't be returned when we ask it for stats. It's important to cache negative
+   * entries for those or else we would require a round-trip every time the table
+   * is loaded.
+   *
+   * This special sentinel value is stored in the cache to indicate such a "negative
+   * cache" entry. It is always compared by reference equality.
+   */
+  private static final ColumnStatisticsObj NEGATIVE_COLUMN_STATS_SENTINEL =
+      new ColumnStatisticsObj();
+
+  /**
+   * Used as a cache key for caching the "null partition key value", which is a global
+   * Hive configuration.
+   */
+  private static final Object NULL_PARTITION_KEY_VALUE_CACHE_KEY = new Object();
+
+  /**
+   * File descriptors store replicas using a compressed format that references hosts
+   * by index in a "host index" list rather than by their full addresses. Since we cache
+   * partition metadata including file descriptors across many queries, we can't rely on
+   * callers to provide a consistent host index. Instead, cached file descriptors are
+   * always relative to this global host index.
+   *
+   * Note that we never evict entries from this host index. We rely on the fact that,
+   * in a given storage cluster, the number of hosts is bounded, and "leaking" the unique
+   * network addresses won't cause a problem over time.
+   */
+  private final ListMap<TNetworkAddress> cacheHostIndex_ =
+      new ListMap<TNetworkAddress>();
+
   // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
   // of metadata. In order to incrementally build this out, we delegate various calls
   // to the "direct" provider for now and circumvent catalogd.
   private DirectMetaProvider directProvider_ = new DirectMetaProvider();
 
+  // TODO(todd): hard-coded TTL is not the final solution here. We should implement
+  // memory estimation for all cached objects, and evict based on a configurable
+  // memory pressure.
+  final Cache<Object,Object> cache_ = CacheBuilder.newBuilder()
+      .expireAfterAccess(1, TimeUnit.HOURS)
+      .recordStats()
+      .build();
+
+  public CacheStats getCacheStats() {
+    return cache_.stats();
+  }
+
   /**
    * Send a GetPartialCatalogObject request to catalogd. This handles converting
    * non-OK status responses back to exceptions, performing various generic sanity
@@ -191,32 +257,88 @@ public class CatalogdMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(final TableMetaRef table,
       List<String> colNames) throws TException {
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.want_stats_for_column_names = colNames;
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    checkResponse(resp.table_info != null && resp.table_info.column_stats != null,
-        req, "missing column stats");
-    return resp.table_info.column_stats;
+    List<ColumnStatisticsObj> ret = Lists.newArrayListWithCapacity(colNames.size());
+
+    // Look up in cache first, keeping track of which ones are missing.
+    int negativeHitCount = 0;
+    List<String> missingCols = Lists.newArrayListWithCapacity(colNames.size());
+    for (String colName: colNames) {
+      ColStatsCacheKey cacheKey = new ColStatsCacheKey((TableMetaRefImpl)table, colName);
+      ColumnStatisticsObj val = (ColumnStatisticsObj)cache_.getIfPresent(cacheKey);
+      if (val == null) {
+        missingCols.add(colName);
+      } else if (val == NEGATIVE_COLUMN_STATS_SENTINEL) {
+        negativeHitCount++;
+      } else {
+        ret.add(val);
+      }
+    }
+    int hitCount = ret.size();
+
+    // Fetch and re-add those missing ones.
+    if (!missingCols.isEmpty()) {
+      TGetPartialCatalogObjectRequest req = newReqForTable(table);
+      req.table_info_selector.want_stats_for_column_names = missingCols;
+      TGetPartialCatalogObjectResponse resp = sendRequest(req);
+      checkResponse(resp.table_info != null && resp.table_info.column_stats != null,
+          req, "missing column stats");
+
+      Set<String> colsWithoutStats = new HashSet<>(missingCols);
+      for (ColumnStatisticsObj stats: resp.table_info.column_stats) {
+        cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, stats.getColName()),
+            stats);
+        ret.add(stats);
+        colsWithoutStats.remove(stats.getColName());
+      }
+
+      // Cache negative entries for any that were not returned.
+      for (String missingColName: colsWithoutStats) {
+        cache_.put(new ColStatsCacheKey((TableMetaRefImpl)table, missingColName),
+            NEGATIVE_COLUMN_STATS_SENTINEL);
+      }
+    }
+    LOG.trace("Request for column stats of {}: hit {}/ neg hit {} / miss {}",
+        table, hitCount, negativeHitCount, missingCols.size());
+    return ret;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public List<PartitionRef> loadPartitionList(TableMetaRef table)
-      throws MetaException, TException {
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.want_partition_names = true;
-    TGetPartialCatalogObjectResponse resp = sendRequest(req);
-    checkResponse(resp.table_info != null && resp.table_info.partitions != null,
-        req, "missing partition list result");
-    List<PartitionRef> ret = Lists.newArrayListWithCapacity(
-        resp.table_info.partitions.size());
-    for (TPartialPartitionInfo p : resp.table_info.partitions) {
-      checkResponse(p.isSetId(), req, "response missing partition IDs for partition %s",
-          p);
-      ret.add(new PartitionRefImpl(p));
+  public List<PartitionRef> loadPartitionList(final TableMetaRef table)
+      throws TException {
+    final Reference<Boolean> hitCache = new Reference<>(true);
+    try {
+      PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl)table);
+      return (List<PartitionRef>) cache_.get(key,
+          new Callable<List<PartitionRef>>() {
+            /** Called to load cache for cache misses */
+            @Override
+            public List<PartitionRef> call() throws Exception {
+              hitCache.setRef(false);
+              TGetPartialCatalogObjectRequest req = newReqForTable(table);
+              req.table_info_selector.want_partition_names = true;
+              TGetPartialCatalogObjectResponse resp = sendRequest(req);
+              checkResponse(resp.table_info != null && resp.table_info.partitions != null,
+                  req, "missing partition list result");
+              List<PartitionRef> partitionRefs = Lists.newArrayListWithCapacity(
+                  resp.table_info.partitions.size());
+              for (TPartialPartitionInfo p: resp.table_info.partitions) {
+                checkResponse(p.isSetId(), req,
+                    "response missing partition IDs for partition %s", p);
+                partitionRefs.add(new PartitionRefImpl(p));
+              }
+              return partitionRefs;
+            }
+          });
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
+      throw new RuntimeException(e);
+    } finally {
+      LOG.trace("Request for partition list of {}: {}", table,
+          hitCache.getRef() ? "hit":"miss");
     }
-    return ret;
   }
 
   @Override
@@ -228,9 +350,45 @@ public class CatalogdMetaProvider implements MetaProvider {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
 
-    List<Long> ids = Lists.newArrayListWithExpectedSize(partitionRefs.size());
+    // Load what we can from the cache.
+    Map<PartitionRef, PartitionMetadata> refToMeta = loadPartitionsFromCache(refImpl,
+        hostIndex, partitionRefs);
+
+    LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(),
+        partitionRefs.size());
+
+    // Load the remainder from the catalogd.
+    List<PartitionRef> missingRefs = Lists.newArrayList();
     for (PartitionRef ref: partitionRefs) {
-      ids.add(((PartitionRefImpl)ref).getId());
+      if (!refToMeta.containsKey(ref)) missingRefs.add(ref);
+    }
+    if (!missingRefs.isEmpty()) {
+      Map<PartitionRef, PartitionMetadata> fromCatalogd = loadPartitionsFromCatalogd(
+          refImpl, hostIndex, missingRefs);
+      refToMeta.putAll(fromCatalogd);
+      // Write back to the cache.
+      storePartitionsInCache(refImpl, hostIndex, fromCatalogd);
+    }
+
+    // Convert the returned map to be by-name instead of by-ref.
+    Map<String, PartitionMetadata> nameToMeta = Maps.newHashMapWithExpectedSize(
+        refToMeta.size());
+    for (Map.Entry<PartitionRef, PartitionMetadata> e: refToMeta.entrySet()) {
+      nameToMeta.put(e.getKey().getName(), e.getValue());
+    }
+    return nameToMeta;
+  }
+
+  /**
+   * Load the specified partitions 'prefs' from catalogd. The partitions are made
+   * relative to the given 'hostIndex' before being returned.
+   */
+  private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCatalogd(
+      TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partRefs) throws TException {
+    List<Long> ids = Lists.newArrayListWithCapacity(partRefs.size());
+    for (PartitionRef partRef: partRefs) {
+      ids.add(((PartitionRefImpl)partRef).getId());
     }
 
     TGetPartialCatalogObjectRequest req = newReqForTable(table);
@@ -246,19 +404,19 @@ public class CatalogdMetaProvider implements MetaProvider {
         req, "returned %d partitions instead of expected %d",
         resp.table_info.partitions.size(), ids.size());
 
-    Map<String, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(ids.size());
-    for (TPartialPartitionInfo part: resp.table_info.partitions) {
+    Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMap();
+    for (int i = 0; i < ids.size(); i++) {
+      PartitionRef partRef = partRefs.get(i);
+      TPartialPartitionInfo part = resp.table_info.partitions.get(i);
       Partition msPart = part.getHms_partition();
       if (msPart == null) {
-        checkResponse(refImpl.msTable_.getPartitionKeysSize() == 0, req,
+        checkResponse(table.msTable_.getPartitionKeysSize() == 0, req,
             "Should not return a partition with missing HMS partition unless " +
             "the table is unpartitioned");
-        msPart = DirectMetaProvider.msTableToPartition(refImpl.msTable_);
+        msPart = DirectMetaProvider.msTableToPartition(table.msTable_);
       }
-      checkResponse(msPart != null && msPart.getValues() != null, req,
-          "malformed partition result: %s", part.toString());
-      String partName = FileUtils.makePartName(partitionColumnNames, msPart.getValues());
 
+      // Transform the file descriptors to the caller's index.
       checkResponse(part.file_descriptors != null, req, "missing file descriptors");
       List<FileDescriptor> fds = Lists.newArrayListWithCapacity(
           part.file_descriptors.size());
@@ -273,15 +431,61 @@ public class CatalogdMetaProvider implements MetaProvider {
       }
       PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
           ImmutableList.copyOf(fds));
-      PartitionMetadata oldVal = ret.put(partName, metaImpl);
+
+      checkResponse(partRef != null, req, "returned unexpected partition id %s", part.id);
+
+      PartitionMetadata oldVal = ret.put(partRef, metaImpl);
       if (oldVal != null) {
-        throw new RuntimeException("catalogd returned partition " + partName +
+        throw new RuntimeException("catalogd returned partition " + part.id +
             " multiple times");
       }
     }
     return ret;
   }
 
+  /**
+   * Load all partitions from 'partitionRefs' that are currently present in the cache.
+   * Any partitions that miss the cache are left unset in the resulting map.
+   *
+   * The FileDescriptors of the resulting partitions are copied and made relative to
+   * the provided hostIndex.
+   */
+  private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCache(
+      TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs) {
+
+    Map<PartitionRef, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(
+        partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      PartitionRefImpl prefImpl = (PartitionRefImpl)ref;
+      PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId());
+      PartitionMetadataImpl val = (PartitionMetadataImpl)cache_.getIfPresent(cacheKey);
+      if (val == null) continue;
+
+      // The entry in the cache has file descriptors that are relative to the cache's
+      // host index, rather than the caller's host index. So, we need to transform them.
+      ret.put(ref, val.cloneRelativeToHostIndex(cacheHostIndex_, hostIndex));
+    }
+    return ret;
+  }
+
+
+  /**
+   * Write back the partitions in 'metas' into the cache. The file descriptors in these
+   * partitions must be relative to the 'hostIndex'.
+   */
+  private void storePartitionsInCache(TableMetaRefImpl table,
+      ListMap<TNetworkAddress> hostIndex, Map<PartitionRef, PartitionMetadata> metas) {
+    for (Map.Entry<PartitionRef, PartitionMetadata> e: metas.entrySet()) {
+      PartitionRefImpl prefImpl = (PartitionRefImpl)e.getKey();
+      PartitionMetadataImpl metaImpl = (PartitionMetadataImpl)e.getValue();
+      PartitionCacheKey cacheKey = new PartitionCacheKey(table, prefImpl.getId());
+      PartitionMetadataImpl cacheVal = metaImpl.cloneRelativeToHostIndex(hostIndex,
+          cacheHostIndex_);
+      cache_.put(cacheKey, cacheVal);
+    }
+  }
+
   private static void checkResponse(boolean condition,
       TGetPartialCatalogObjectRequest req, String msg, Object... args) throws TException {
     if (condition) return;
@@ -291,7 +495,19 @@ public class CatalogdMetaProvider implements MetaProvider {
 
   @Override
   public String loadNullPartitionKeyValue() throws MetaException, TException {
-    return directProvider_.loadNullPartitionKeyValue();
+    try {
+      return (String) cache_.get(NULL_PARTITION_KEY_VALUE_CACHE_KEY,
+          new Callable<String>() {
+            /** Called to load cache for cache misses */
+            @Override
+            public String call() throws Exception {
+              return directProvider_.loadNullPartitionKeyValue();
+            }
+          });
+    } catch (ExecutionException e) {
+      Throwables.propagateIfPossible(e.getCause(), TException.class);
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -317,7 +533,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     private final TPartialPartitionInfo info_;
 
     public PartitionRefImpl(TPartialPartitionInfo p) {
-      this.info_ = p;
+      this.info_ = Preconditions.checkNotNull(p);
     }
 
     @Override
@@ -336,10 +552,24 @@ public class CatalogdMetaProvider implements MetaProvider {
 
     public PartitionMetadataImpl(Partition msPartition,
         ImmutableList<FileDescriptor> fds) {
-      this.msPartition_ = msPartition;
+      this.msPartition_ = Preconditions.checkNotNull(msPartition);
       this.fds_ = fds;
     }
 
+    /**
+     * Clone this metadata object, but make it relative to 'dstIndex' instead of
+     * 'origIndex'.
+     */
+    public PartitionMetadataImpl cloneRelativeToHostIndex(
+        ListMap<TNetworkAddress> origIndex,
+        ListMap<TNetworkAddress> dstIndex) {
+      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(fds_.size());
+      for (FileDescriptor fd: fds_) {
+        fds.add(fd.cloneWithNewHostIndex(origIndex.getList(), dstIndex));
+      }
+      return new PartitionMetadataImpl(msPartition_, ImmutableList.copyOf(fds));
+    }
+
     @Override
     public Partition getHmsPartition() {
       return msPartition_;
@@ -381,5 +611,122 @@ public class CatalogdMetaProvider implements MetaProvider {
       this.msTable_ = msTable;
       this.catalogVersion_ = catalogVersion;
     }
+
+    @Override
+    public String toString() {
+      return String.format("TableMetaRef %s.%s@%d", dbName_, tableName_, catalogVersion_);
+    }
+  }
+
+  /**
+   * Base class for cache keys related to a specific table.
+   */
+  private static class TableCacheKey {
+    final String dbName_;
+    final String tableName_;
+    /**
+     * The catalog version number of the Table object. Including the version number in
+     * the cache key ensures that, if the version number changes, any dependent entities
+     * are "automatically" invalidated.
+     *
+     * TODO(todd): need to handle the case of a catalogd restarting and reloading metadata
+     * for a table reusing a previously-used version number.
+     */
+    final long version_;
+
+    TableCacheKey(TableMetaRefImpl table) {
+      dbName_ = table.dbName_;
+      tableName_ = table.tableName_;
+      version_ = table.catalogVersion_;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(dbName_, tableName_, version_, getClass());
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !obj.getClass().equals(getClass())) {
+        return false;
+      }
+      TableCacheKey other = (TableCacheKey)obj;
+      return version_ == other.version_ &&
+          tableName_.equals(other.tableName_) &&
+          dbName_.equals(other.dbName_);
+    }
+  }
+
+  /**
+   * Cache key for an entry storing column statistics.
+   *
+   * Values for these keys are 'ColumnStatisticsObj' objects.
+   */
+  private static class ColStatsCacheKey extends TableCacheKey {
+    private final String colName_;
+
+    public ColStatsCacheKey(TableMetaRefImpl table, String colName) {
+      super(table);
+      colName_ = colName;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(super.hashCode(), colName_);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof ColStatsCacheKey)) {
+        return false;
+      }
+      ColStatsCacheKey other = (ColStatsCacheKey)obj;
+      return super.equals(obj) && colName_.equals(other.colName_);
+    }
+  }
+
+  /**
+   * Cache key for the partition list of a table.
+   *
+   * Values for these keys are 'List<PartitionRefImpl>'.
+   */
+  private static class PartitionListCacheKey extends TableCacheKey {
+    PartitionListCacheKey(TableMetaRefImpl table) {
+      super(table);
+    }
+  }
+
+  /**
+   * Key for caching information about a single partition.
+   *
+   * TODO(todd): currently this inherits from TableCacheKey. This means that, if a
+   * table's version number changes, all of its partitions must be reloaded. However,
+   * since partition IDs are globally unique within a catalogd instance, we could
+   * optimize this to just key based on the partition ID. However, currently, there are
+   * some cases where partitions are mutated in place rather than replaced with a new ID.
+   * We need to eliminate those or add a partition sequence number before we can make
+   * this optimization.
+   */
+  private static class PartitionCacheKey extends TableCacheKey {
+    private final long partId_;
+
+    PartitionCacheKey(TableMetaRefImpl table, long partId) {
+      super(table);
+      partId_ = partId;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(super.hashCode(), partId_);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || !(obj instanceof PartitionCacheKey)) {
+        return false;
+      }
+      PartitionCacheKey other = (PartitionCacheKey)obj;
+      return super.equals(obj) && partId_ == other.partId_;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 8d57210..89f5345 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -71,8 +71,10 @@ public class LocalCatalog implements FeCatalog {
   private String nullPartitionKeyValue_;
   private final String defaultKuduMasterHosts_;
 
+  private static MetaProvider PROVIDER = new CatalogdMetaProvider();
+
   public static LocalCatalog create(String defaultKuduMasterHosts) {
-    return new LocalCatalog(new CatalogdMetaProvider(), defaultKuduMasterHosts);
+    return new LocalCatalog(PROVIDER, defaultKuduMasterHosts);
   }
 
   private LocalCatalog(MetaProvider metaProvider, String defaultKuduMasterHosts) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index da784d6..10021e0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -372,6 +372,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   @Override
   public List<? extends FeFsPartition> loadPartitions(Collection<Long> ids) {
+    // TODO(todd) it seems like some queries actually call this multiple times.
+    // Perhaps we should store the result in this class, instead of relying on
+    // catalog-layer caching?
     Preconditions.checkState(partitionSpecs_ != null,
         "Cannot load partitions without having fetched partition IDs " +
         "from the same LocalFsTable instance");

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 0a217da..72e8f03 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -60,6 +60,11 @@ interface MetaProvider {
   String loadNullPartitionKeyValue()
       throws MetaException, TException;
 
+  /**
+   * Load the list of partitions for the given table. Each returned partition
+   * acts as a reference which can later be passed to 'loadPartitionsByRefs' in order
+   * to fetch more detailed metadata (e.g. after partition pruning has completed).
+   */
   List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
new file mode 100644
index 0000000..f7c3abd
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.common.Pair;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.CacheStats;
+import com.google.common.collect.ImmutableList;
+
+public class CatalogdMetaProviderTest {
+
+  private final static Logger LOG = LoggerFactory.getLogger(
+      CatalogdMetaProviderTest.class);
+
+  private final CatalogdMetaProvider provider_;
+  private final TableMetaRef tableRef_;
+
+  private CacheStats prevStats_;
+
+  static {
+    FeSupport.loadLibrary();
+  }
+
+  public CatalogdMetaProviderTest() throws Exception {
+    provider_ = new CatalogdMetaProvider();
+    Pair<Table, TableMetaRef> tablePair = provider_.loadTable("functional", "alltypes");
+    tableRef_ = tablePair.second;
+    prevStats_ = provider_.getCacheStats();
+  }
+
+  private CacheStats diffStats() {
+    CacheStats s = provider_.getCacheStats();
+    CacheStats diff = s.minus(prevStats_);
+    prevStats_ = s;
+    LOG.info("Stats: {}", diff);
+    return diff;
+  }
+
+  @Test
+  public void testCachePartitionList() throws Exception {
+    List<PartitionRef> partList = provider_.loadPartitionList(tableRef_);
+    CacheStats stats = diffStats();
+    assertEquals(1, stats.requestCount());
+    assertEquals(1, stats.loadCount());
+    assertEquals(0, stats.hitCount());
+
+    List<PartitionRef> partListHit = provider_.loadPartitionList(tableRef_);
+    stats = diffStats();
+    assertEquals(1, stats.requestCount());
+    assertEquals(1, stats.hitCount());
+
+    // Results should be the same.
+    assertEquals(partList, partListHit);
+  }
+
+  @Test
+  public void testCachePartitionsByRef() throws Exception {
+    List<PartitionRef> allRefs = provider_.loadPartitionList(tableRef_);
+    List<PartitionRef> partialRefs = allRefs.subList(3, 8);
+    ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+    CacheStats stats = diffStats();
+
+    // Should get no hits on the initial load of partitions.
+    Map<String, PartitionMetadata> partMap = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        partialRefs);
+    assertEquals(partialRefs.size(), partMap.size());
+    stats = diffStats();
+    assertEquals(0, stats.hitCount());
+
+    // Load the same partitions again and we should get a hit for each partition.
+    Map<String, PartitionMetadata> partMapHit = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        partialRefs);
+    stats = diffStats();
+    assertEquals(stats.hitCount(), partMapHit.size());
+
+    // Load all of the partitions: we should get some hits and some misses.
+    Map<String, PartitionMetadata> allParts = provider_.loadPartitionsByRefs(
+        tableRef_, /* partitionColumnNames unused by this impl */null, hostIndex,
+        allRefs);
+    assertEquals(allRefs.size(), allParts.size());
+    stats = diffStats();
+    assertEquals(stats.hitCount(), partMapHit.size());
+  }
+
+  @Test
+  public void testCacheColumnStats() throws Exception {
+    ImmutableList<String> colNames = ImmutableList.of("month", "id");
+    List<ColumnStatisticsObj> colStats = provider_.loadTableColumnStatistics(tableRef_,
+        colNames);
+    // Only 'id' has stats -- 'month' is a partition column and therefore has none.
+    assertEquals(1, colStats.size());
+    CacheStats stats = diffStats();
+    // We should have missed on both columns.
+    assertEquals(2, stats.requestCount());
+    assertEquals(2, stats.missCount());
+
+    // Look up again, and we should get the same results.
+    List<ColumnStatisticsObj> colStats2 = provider_.loadTableColumnStatistics(tableRef_,
+        colNames);
+    assertEquals(colStats, colStats2);
+
+    // Should have gotten hits on both columns (one positive, one negative).
+    stats = diffStats();
+    assertEquals(2, stats.requestCount());
+    assertEquals(2, stats.hitCount());
+    assertEquals(0, stats.missCount());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/3fa05604/impala-parent/pom.xml
----------------------------------------------------------------------
diff --git a/impala-parent/pom.xml b/impala-parent/pom.xml
index 92233f4..dbdf165 100644
--- a/impala-parent/pom.xml
+++ b/impala-parent/pom.xml
@@ -54,7 +54,9 @@ under the License.
     <httpcomponents.core.version>4.4.9</httpcomponents.core.version>
     <yarn-extras.version>${project.version}</yarn-extras.version>
     <eclipse.output.directory>eclipse-classes</eclipse.output.directory>
-    <guava.version>11.0.2</guava.version>
+    <!-- hive-exec seems to leak this version of guava onto our classpath,
+         so it's important to depend on the same one -->
+    <guava.version>14.0.1</guava.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   </properties>
 


[3/4] impala git commit: IMPALA-7436: initial fetch-from-catalogd implementation

Posted by jo...@apache.org.
IMPALA-7436: initial fetch-from-catalogd implementation

This patch adds a new RPC to the catalogd which allows a client to fetch
a partial view of table or database metadata. Various subsets of
information can be specified and are sent back in fairly "raw" format.

A new MetaProvider implementation is added which uses this API to
support granular fetching of metadata into the impalad. The interface
had to be reworked in a few ways to support this:

- This API uses partition IDs instead of names to specify them. So, the
  listPartitions API now returns opaque PartitionRefs which are passed
  back to the MetaProvider when loading more partition details. The new
  implementation stores the IDs in these refs while the direct-to-HMS
  implementation just uses names.

- The fetching of file descriptors was merged into the loading of other
  partition metadata. I couldn't think of any cases where we needed to
  list partition details without also fetching the file descriptors so
  it simplified things a bit to merge the two. This was a lot easier to
  implement for CatalogdMetaProvider since the file metadata is stored
  by partition rather than looked up by a directory as in the previous
  API.

  This necessitated moving some of the logic out of LocalFsTable into
  DirectMetaProvider, so LocalFsTable no longer deals directly with HDFS
  APIs like FileStatus.

- The handling of "default partition" for an unpartitioned table moved
  into the MetaProvider implementations itself instead of LocalFsTable.
  This is because the CatalogdProvider sees the "default partition" as a
  partition that actually has an identifier on the catalogd, whereas the
  DirectMetaProvider does not. So, now both providers export the
  "default partition" as a partition like all the others.

This patch also starts to address one of the potential semantic risks of
partial caching on the impalad. If one query fetches some subset of
partitions, then a DDL occurs to change the table metadata, and another
query is submitted, we want to ensure that the metadata for the latter
query still reads a consistent snapshot. In other words, we need to
ensure that the metadata like partition list and table schema come from
the same snapshot as the finer-grained metadata like partition contents.

In order to implement this, the MetadataProvider API now requires that
callers use a 'TableRef' object to specify the table to be read, instead
of the dbName/tableName. In the DirectMetaProvider we don't have any
convenient version numbers for a table, so the TableRef just
encapsulates the naming. In the CatalogdMetaProvider, we additionally
store the version number of the table, and then all subsequent requests
verify that the version number has not changed. If it detects a
concurrent modification, an exception is thrown. In a future patch,
I'm planning on having the frontend catch the exception and trigger a
"re-plan".

Change-Id: If49207fc592b1cc552fbcc7199568b6833f86901
Reviewed-on: http://gerrit.cloudera.org:8080/11182
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ef15da08
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ef15da08
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ef15da08

Branch: refs/heads/master
Commit: ef15da08aa124126805201ba0c7199e22c0dcdb7
Parents: 8360277
Author: Todd Lipcon <to...@cloudera.com>
Authored: Thu Aug 2 01:20:23 2018 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Aug 21 16:17:03 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                |  18 +
 be/src/catalog/catalog-service-client-wrapper.h |   8 +
 be/src/catalog/catalog.cc                       |   6 +
 be/src/catalog/catalog.h                        |   7 +
 be/src/exec/catalog-op-executor.cc              |  19 +-
 be/src/exec/catalog-op-executor.h               |   7 +
 be/src/service/fe-support.cc                    |  27 ++
 common/fbs/CMakeLists.txt                       |   2 +-
 common/thrift/CatalogService.thrift             | 128 ++++++
 .../impala/catalog/CatalogServiceCatalog.java   |  70 ++++
 .../org/apache/impala/catalog/ColumnStats.java  |  63 +++
 .../main/java/org/apache/impala/catalog/Db.java |  28 ++
 .../apache/impala/catalog/HdfsPartition.java    |  38 +-
 .../org/apache/impala/catalog/HdfsTable.java    |  56 +++
 .../apache/impala/catalog/IncompleteTable.java  |  10 +
 .../java/org/apache/impala/catalog/Table.java   |  48 +++
 .../catalog/local/CatalogdMetaProvider.java     | 385 +++++++++++++++++++
 .../catalog/local/DirectMetaProvider.java       | 243 ++++++++++--
 .../InconsistentMetadataFetchException.java     |  37 ++
 .../impala/catalog/local/LocalCatalog.java      |   2 +-
 .../impala/catalog/local/LocalFsPartition.java  |  79 +---
 .../impala/catalog/local/LocalFsTable.java      |  98 ++---
 .../impala/catalog/local/LocalHbaseTable.java   |  12 +-
 .../impala/catalog/local/LocalKuduTable.java    |  13 +-
 .../catalog/local/LocalPartitionSpec.java       |  64 ++-
 .../apache/impala/catalog/local/LocalTable.java |  40 +-
 .../apache/impala/catalog/local/LocalView.java  |   5 +-
 .../impala/catalog/local/MetaProvider.java      |  47 ++-
 .../org/apache/impala/common/RuntimeEnv.java    |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java |   1 -
 .../impala/service/CatalogOpExecutor.java       |  58 +--
 .../org/apache/impala/service/FeSupport.java    |  14 +
 .../org/apache/impala/service/Frontend.java     |   4 +
 .../org/apache/impala/service/JniCatalog.java   |  10 +
 .../impala/catalog/HdfsPartitionTest.java       |  69 +++-
 .../impala/catalog/PartialCatalogInfoTest.java  | 183 +++++++++
 .../impala/catalog/local/LocalCatalogTest.java  |  18 +
 .../apache/impala/common/FrontendTestBase.java  |   2 +-
 38 files changed, 1640 insertions(+), 280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index a17478f..96646fb 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -124,6 +124,24 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
     VLOG_RPC << "GetCatalogObject(): response=" << ThriftDebugString(resp);
   }
 
+  virtual void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& resp,
+      const TGetPartialCatalogObjectRequest& req) {
+    // TODO(todd): capture detailed metrics on the types of inbound requests, lock
+    // wait times, etc.
+    // TODO(todd): add some kind of limit on the number of concurrent requests here
+    // to avoid thread exhaustion -- eg perhaps it would be best to use a trylock
+    // on the catalog locks, or defer these calls to a separate (bounded) queue,
+    // so a heavy query workload against a table undergoing a slow refresh doesn't
+    // end up taking down the catalog by creating thousands of threads.
+    VLOG_RPC << "GetPartialCatalogObject(): request=" << ThriftDebugString(req);
+    Status status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
+    if (!status.ok()) LOG(ERROR) << status.GetDetail();
+    TStatus thrift_status;
+    status.ToThrift(&thrift_status);
+    resp.__set_status(thrift_status);
+    VLOG_RPC << "GetPartialCatalogObject(): response=" << ThriftDebugString(resp);
+  }
+
   // Prioritizes the loading of metadata for one or more catalog objects. Currently only
   // used for loading tables/views because they are the only type of object that is loaded
   // lazily.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog-service-client-wrapper.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-service-client-wrapper.h b/be/src/catalog/catalog-service-client-wrapper.h
index 22ac56d..46ecdf2 100644
--- a/be/src/catalog/catalog-service-client-wrapper.h
+++ b/be/src/catalog/catalog-service-client-wrapper.h
@@ -55,6 +55,14 @@ class CatalogServiceClientWrapper : public CatalogServiceClient {
     recv_GetCatalogObject(_return);
   }
 
+  void GetPartialCatalogObject(TGetPartialCatalogObjectResponse& _return,
+      const TGetPartialCatalogObjectRequest& req, bool* send_done) {
+    DCHECK(!*send_done);
+    send_GetPartialCatalogObject(req);
+    *send_done = true;
+    recv_GetPartialCatalogObject(_return);
+  }
+
   void ResetMetadata(TResetMetadataResponse& _return, const TResetMetadataRequest& req,
       bool* send_done) {
     DCHECK(!*send_done);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index dcc1657..cb445c3 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -63,6 +63,7 @@ Catalog::Catalog() {
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
+    {"getPartialCatalogObject", "([B)[B", &get_partial_catalog_object_id_},
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
@@ -91,6 +92,11 @@ Status Catalog::GetCatalogObject(const TCatalogObject& req,
   return JniUtil::CallJniMethod(catalog_, get_catalog_object_id_, req, resp);
 }
 
+Status Catalog::GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& req,
+    TGetPartialCatalogObjectResponse* resp) {
+  return JniUtil::CallJniMethod(catalog_, get_partial_catalog_object_id_, req, resp);
+}
+
 Status Catalog::GetCatalogVersion(long* version) {
   JNIEnv* jni_env = getJNIEnv();
   JniLocalFrame jni_frame;

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 872ceca..6f7b051 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -72,6 +72,12 @@ class Catalog {
   /// information on the error will be returned.
   Status GetCatalogObject(const TCatalogObject& request, TCatalogObject* response);
 
+  /// Return partial information about a Catalog object.
+  /// Returns OK if the operation was successful, otherwise a Status object with
+  /// information on the error will be returned.
+  Status GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& request,
+      TGetPartialCatalogObjectResponse* response);
+
   /// Return all databases matching the optional argument 'pattern'.
   /// If pattern is NULL, match all databases otherwise match only those databases that
   /// match the pattern string. Patterns are "p1|p2|p3" where | denotes choice,
@@ -121,6 +127,7 @@ class Catalog {
   jmethodID exec_ddl_id_;  // JniCatalog.execDdl()
   jmethodID reset_metadata_id_;  // JniCatalog.resetMetdata()
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
+  jmethodID get_partial_catalog_object_id_;  // JniCatalog.getPartialCatalogObject()
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 164187c..d587205 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -28,8 +28,9 @@
 #include "service/frontend.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
-#include "util/string-parser.h"
 #include "util/runtime-profile-counters.h"
+#include "util/string-parser.h"
+#include "util/test-info.h"
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/CatalogService_types.h"
 #include "gen-cpp/CatalogObjects_types.h"
@@ -43,6 +44,7 @@ using namespace impala;
 using namespace apache::hive::service::cli::thrift;
 using namespace apache::thrift;
 
+DECLARE_bool(use_local_catalog);
 DECLARE_int32(catalog_service_port);
 DECLARE_string(catalog_service_host);
 
@@ -283,6 +285,21 @@ Status CatalogOpExecutor::GetCatalogObject(const TCatalogObject& object_desc,
   return Status::OK();
 }
 
+Status CatalogOpExecutor::GetPartialCatalogObject(
+    const TGetPartialCatalogObjectRequest& req,
+    TGetPartialCatalogObjectResponse* resp) {
+  DCHECK(FLAGS_use_local_catalog || TestInfo::is_test());
+  const TNetworkAddress& address =
+      MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
+  Status status;
+  CatalogServiceConnection client(env_->catalogd_client_cache(), address, &status);
+  RETURN_IF_ERROR(status);
+  RETURN_IF_ERROR(
+      client.DoRpc(&CatalogServiceClientWrapper::GetPartialCatalogObject, req, resp));
+  return Status::OK();
+}
+
+
 Status CatalogOpExecutor::PrioritizeLoad(const TPrioritizeLoadRequest& req,
     TPrioritizeLoadResponse* result) {
   const TNetworkAddress& address =

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/exec/catalog-op-executor.h
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.h b/be/src/exec/catalog-op-executor.h
index 375c839..c58a99e 100644
--- a/be/src/exec/catalog-op-executor.h
+++ b/be/src/exec/catalog-op-executor.h
@@ -30,6 +30,9 @@ class Frontend;
 class Status;
 class RuntimeProfile;
 
+class TGetPartialCatalogObjectRequest;
+class TGetPartialCatalogObjectResponse;
+
 /// The CatalogOpExecutor is responsible for executing catalog operations.
 /// This includes DDL statements such as CREATE and ALTER as well as statements such
 /// as INVALIDATE METADATA. One CatalogOpExecutor is typically created per catalog
@@ -47,6 +50,10 @@ class CatalogOpExecutor {
   /// be loaded.
   Status GetCatalogObject(const TCatalogObject& object_desc, TCatalogObject* result);
 
+  /// Fetch partial information about a specific TCatalogObject from the catalog server.
+  Status GetPartialCatalogObject(const TGetPartialCatalogObjectRequest& req,
+      TGetPartialCatalogObjectResponse* resp);
+
   /// Translates the given compute stats request and its child-query results into
   /// a new table alteration request for updating the stats metadata, and executes
   /// the alteration via Exec();

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 187d14e..eb64ce5 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -523,6 +523,28 @@ Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
   return result_bytes;
 }
 
+// Calls in to the catalog server to request partial information about a
+// catalog object.
+extern "C"
+JNIEXPORT jbyteArray JNICALL
+Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(
+    JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
+  TGetPartialCatalogObjectRequest request;
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
+
+  CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), nullptr, nullptr);
+  TGetPartialCatalogObjectResponse result;
+  Status status = catalog_op_executor.GetPartialCatalogObject(request, &result);
+  THROW_IF_ERROR_RET(status, env, JniUtil::internal_exc_class(), nullptr);
+
+  jbyteArray result_bytes = nullptr;
+  THROW_IF_ERROR_RET(SerializeThriftMsg(env, &result, &result_bytes), env,
+      JniUtil::internal_exc_class(), result_bytes);
+  return result_bytes;
+}
+
+
 // Used to call native code from the FE to parse and set comma-delimited key=value query
 // options.
 extern "C"
@@ -581,6 +603,11 @@ static JNINativeMethod native_methods[] = {
       (void*)::Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad
   },
   {
+      const_cast<char*>("NativeGetPartialCatalogObject"),
+      const_cast<char*>("([B)[B"),
+      (void*)::Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject
+  },
+  {
       const_cast<char*>("NativeParseQueryOptions"),
       const_cast<char*>("(Ljava/lang/String;[B)[B"),
       (void*)::Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/common/fbs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/fbs/CMakeLists.txt b/common/fbs/CMakeLists.txt
index d0c4ef7..a55fa08 100644
--- a/common/fbs/CMakeLists.txt
+++ b/common/fbs/CMakeLists.txt
@@ -60,7 +60,7 @@ set(BE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp)
 set(FE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/fe/generated-sources/gen-java)
 file(MAKE_DIRECTORY ${FE_OUTPUT_DIR})
 file(MAKE_DIRECTORY ${BE_OUTPUT_DIR})
-set(JAVA_FE_ARGS --java -o ${FE_OUTPUT_DIR} -b)
+set(JAVA_FE_ARGS --gen-mutable --java -o ${FE_OUTPUT_DIR} -b)
 message(${JAVA_FE_ARGS})
 set(CPP_ARGS --cpp -o ${BE_OUTPUT_DIR} -b)
 message(${CPP_ARGS})

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/common/thrift/CatalogService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index aaabd31..98c9b05 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -23,6 +23,7 @@ include "JniCatalog.thrift"
 include "Types.thrift"
 include "Status.thrift"
 include "Results.thrift"
+include "hive_metastore.thrift"
 
 // CatalogServer service API and related structs.
 
@@ -234,6 +235,129 @@ struct TGetFunctionsResponse {
   2: optional list<Types.TFunction> functions;
 }
 
+// Selector for partial information about Catalog-scoped objects
+// (i.e. those that are not within a particular database or table).
+struct TCatalogInfoSelector {
+  1: bool want_db_names
+  // TODO(todd): add objects like DataSources, etc.
+}
+
+// Returned info from a catalog request which selected items in
+// TCatalogInfoSelector.
+struct TPartialCatalogInfo {
+  1: list<string> db_names
+}
+
+// Selector for partial information about a Table.
+struct TTableInfoSelector {
+  // The response should include the HMS table struct.
+  1: bool want_hms_table
+
+  // If set, the response should include information about the given list of
+  // partitions. If this is unset, information about all partitions will be
+  // returned, so long as at least one of the following 'want_partition_*'
+  // flags is specified.
+  //
+  // If a partition ID is passed, but that partition does not exist in the
+  // table, then an exception will be thrown. It is assumed that the partition
+  // IDs passed here are a result of a prior successful call to fetch the partition
+  // list of this table.
+  //
+  // NOTE: "unset" and "set to empty" are different -- "set to empty" causes
+  // no partitions to be returned, whereas "unset" causes all partitions to be
+  // returned, so long as one of the following 'want_partition_*' is set.
+  2: optional list<i64> partition_ids
+
+  // ... each such partition should include its name.
+  3: bool want_partition_names
+
+  // ... each such partition should include metadata (location, etc).
+  4: bool want_partition_metadata
+
+  // ... each such partition should include its file info
+  5: bool want_partition_files
+
+  // List of columns to fetch stats for.
+  6: optional list<string> want_stats_for_column_names
+}
+
+// Returned information about a particular partition.
+struct TPartialPartitionInfo {
+  1: required i64 id
+
+  // Set if 'want_partition_names' was set in TTableInfoSelector.
+  2: optional string name
+
+  // Set if 'want_partition_metadata' was set in TTableInfoSelector.
+  3: optional hive_metastore.Partition hms_partition
+
+  // Set if 'want_partition_files' was set in TTableInfoSelector.
+  4: optional list<CatalogObjects.THdfsFileDesc> file_descriptors
+}
+
+// Returned information about a Table, as selected by TTableInfoSelector.
+struct TPartialTableInfo {
+  1: optional hive_metastore.Table hms_table
+
+  // The partition metadata for the requested partitions.
+  //
+  // If explicit partitions were passed, then it is guaranteed that this list
+  // is the same size and the same order as the requested list of IDs.
+  //
+  // See TPartialPartitionInfo for details on which fields will be set based
+  // on the caller-provided selector.
+  2: optional list<TPartialPartitionInfo> partitions
+
+  3: optional list<hive_metastore.ColumnStatisticsObj> column_stats
+
+  // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
+  // Used so that each THdfsFileBlock can just reference an index in this list rather
+  // than duplicate the list of network address, which helps reduce memory usage.
+  // Only used when partition files are fetched.
+  7: optional list<Types.TNetworkAddress> network_addresses
+}
+
+// Selector for partial information about a Database.
+struct TDbInfoSelector {
+  // The response should include the HMS Database object.
+  1: bool want_hms_database
+  // The response should include the list of table names in the DB.
+  2: bool want_table_names
+  // TODO(todd): function names
+}
+
+// Returned information about a Database, as selected by TDbInfoSelector.
+struct TPartialDbInfo {
+  1: optional hive_metastore.Database hms_database
+  2: optional list<string> table_names
+}
+
+// RPC request for GetPartialCatalogObject.
+struct TGetPartialCatalogObjectRequest {
+  1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
+
+  // A catalog object descriptor: a TCatalogObject with the object name and type fields
+  // set.
+  2: required CatalogObjects.TCatalogObject object_desc
+
+  3: optional TTableInfoSelector table_info_selector
+  4: optional TDbInfoSelector db_info_selector
+  5: optional TCatalogInfoSelector catalog_info_selector
+}
+
+// RPC response for GetPartialCatalogObject.
+struct TGetPartialCatalogObjectResponse {
+  // The status of the operation, OK if the operation was successful.
+  // Unset indicates "OK".
+  1: optional Status.TStatus status
+
+  2: optional i64 object_version_number
+  3: optional TPartialTableInfo table_info
+  4: optional TPartialDbInfo db_info
+  5: optional TPartialCatalogInfo catalog_info
+}
+
+
 // Request the complete metadata for a given catalog object. May trigger a metadata load
 // if the object is not already in the catalog cache.
 struct TGetCatalogObjectRequest {
@@ -316,4 +440,8 @@ service CatalogService {
   // TODO: When Sentry Service has a better mechanism to perform these changes this API
   // should be deprecated.
   TSentryAdminCheckResponse SentryAdminCheck(1: TSentryAdminCheckRequest req);
+
+  // Fetch partial information about some object in the catalog.
+  TGetPartialCatalogObjectResponse GetPartialCatalogObject(
+      1: TGetPartialCatalogObjectRequest req);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 1fc43f5..68fa274 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -45,10 +45,15 @@ import org.apache.impala.common.Reference;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TCatalog;
+import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
+import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TGetCatalogUsageResponse;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialCatalogInfo;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrincipalType;
 import org.apache.impala.thrift.TPrivilege;
@@ -1917,4 +1922,69 @@ public class CatalogServiceCatalog extends Catalog {
       tbl.getLock().unlock();
     }
   }
+
+  /**
+   * Return a partial view of information about a given catalog object. This services
+   * the CatalogdMetaProvider running on impalads when they are configured in
+   * "local-catalog" mode.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
+    TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
+        "missing object_desc");
+    switch (objectDesc.type) {
+    case CATALOG:
+      return getPartialCatalogInfo(req);
+    case DATABASE:
+      TDatabase dbDesc = Preconditions.checkNotNull(req.object_desc.db);
+      versionLock_.readLock().lock();
+      try {
+        Db db = getDb(dbDesc.getDb_name());
+        if (db == null) {
+          throw new CatalogException(
+              "Database not found: " + req.object_desc.getDb().getDb_name());
+        }
+
+        return db.getPartialInfo(req);
+      } finally {
+        versionLock_.readLock().unlock();
+      }
+    case TABLE:
+    case VIEW: {
+      Table table = getOrLoadTable(objectDesc.getTable().getDb_name(),
+          objectDesc.getTable().getTbl_name());
+      if (table == null) {
+        throw new CatalogException("Table not found: " +
+            objectDesc.getTable().getTbl_name());
+      }
+      // TODO(todd): consider a read-write lock here.
+      table.getLock().lock();
+      try {
+        return table.getPartialInfo(req);
+      } finally {
+        table.getLock().unlock();
+      }
+    }
+    default:
+      throw new CatalogException("Unable to fetch partial info for type: " +
+          req.object_desc.type);
+    }
+  }
+
+  /**
+   * Return a partial view of information about global parts of the catalog (eg
+   * the list of tables, etc).
+   */
+  private TGetPartialCatalogObjectResponse getPartialCatalogInfo(
+      TGetPartialCatalogObjectRequest req) {
+    TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
+    resp.catalog_info = new TPartialCatalogInfo();
+    TCatalogInfoSelector sel = Preconditions.checkNotNull(req.catalog_info_selector,
+        "no catalog_info_selector in request");
+    if (sel.want_db_names) {
+      resp.catalog_info.db_names = ImmutableList.copyOf(dbCache_.get().keySet());
+    }
+    // TODO(todd) implement data sources and other global information.
+    return resp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
index c798d96..d05bfa4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ColumnStats.java
@@ -33,6 +33,7 @@ import org.apache.impala.thrift.TColumnStats;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+import com.google.common.math.LongMath;
 
 /**
  * Statistics for a single column.
@@ -249,6 +250,68 @@ public class ColumnStats {
   }
 
   /**
+   * Convert the statistics back into an HMS-compatible ColumnStatisticsData object.
+   * This is essentially the inverse of {@link #update(Type, ColumnStatisticsData)
+   * above.
+   *
+   * Returns null if statistics for the specified type are not supported.
+   */
+  public static ColumnStatisticsData createHiveColStatsData(
+      long capNdv, TColumnStats colStats, Type colType) {
+    ColumnStatisticsData colStatsData = new ColumnStatisticsData();
+    long ndv = colStats.getNum_distinct_values();
+    // Cap NDV at row count if available.
+    if (capNdv >= 0) ndv = Math.min(ndv, capNdv);
+
+    long numNulls = colStats.getNum_nulls();
+    switch(colType.getPrimitiveType()) {
+      case BOOLEAN:
+        colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls));
+        break;
+      case TINYINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case SMALLINT:
+        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case INT:
+        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case BIGINT:
+      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
+        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
+        break;
+      case FLOAT:
+      case DOUBLE:
+        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
+        break;
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        long maxStrLen = colStats.getMax_size();
+        double avgStrLen = colStats.getAvg_size();
+        colStatsData.setStringStats(
+            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
+        break;
+      case DECIMAL:
+        double decMaxNdv = Math.pow(10, colType.getPrecision());
+        ndv = (long) Math.min(ndv, decMaxNdv);
+        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, ndv));
+        break;
+      default:
+        return null;
+    }
+    return colStatsData;
+  }
+
+  public ColumnStatisticsData toHmsCompatibleThrift(Type colType) {
+    return createHiveColStatsData(-1, toThrift(), colType);
+  }
+
+  /**
    * Sets the member corresponding to the given stats key to 'value'.
    * Requires that the given value is of a type appropriate for the
    * member being set. Throws if that is not the case.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/Db.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java
index cb98c85..7e15210 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Db.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Db.java
@@ -35,8 +35,12 @@ import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TFunctionBinaryType;
 import org.apache.impala.thrift.TFunctionCategory;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialDbInfo;
 import org.apache.impala.util.FunctionUtils;
 import org.apache.impala.util.PatternMatcher;
 
@@ -435,4 +439,28 @@ public class Db extends CatalogObjectImpl implements FeDb {
     catalogObj.setDb(toThrift());
     return catalogObj;
   }
+
+  /**
+   * Get partial information about this DB in order to service CatalogdMetaProvider
+   * running in a remote impalad.
+   */
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) {
+    TDbInfoSelector selector = Preconditions.checkNotNull(req.db_info_selector,
+        "no db_info_selector");
+
+    TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
+    resp.setObject_version_number(getCatalogVersion());
+    resp.db_info = new TPartialDbInfo();
+    if (selector.want_hms_database) {
+      // TODO(todd): we need to deep-copy here because 'addFunction' other DDLs
+      // modify the parameter map in place. We need to change those to copy-on-write
+      // instead to avoid this copy.
+      resp.db_info.hms_database = getMetaStoreDb().deepCopy();
+    }
+    if (selector.want_table_names) {
+      resp.db_info.table_names = getAllTableNames();
+    }
+    return resp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 14ce4e3..26ece19 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -99,6 +99,34 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     }
 
     /**
+     * Clone the descriptor, but change the replica indexes to reference the new host
+     * index 'dstIndex' instead of the original index 'origIndex'.
+     */
+    public FileDescriptor cloneWithNewHostIndex(List<TNetworkAddress> origIndex,
+        ListMap<TNetworkAddress> dstIndex) {
+      // First clone the flatbuffer with no changes.
+      ByteBuffer oldBuf = fbFileDescriptor_.getByteBuffer();
+      ByteBuffer newBuf = ByteBuffer.allocate(oldBuf.remaining());
+      newBuf.put(oldBuf.array(), oldBuf.position(), oldBuf.remaining());
+      newBuf.rewind();
+      FbFileDesc cloned = FbFileDesc.getRootAsFbFileDesc(newBuf);
+
+      // Now iterate over the blocks in the new flatbuffer and mutate the indexes.
+      FbFileBlock it = new FbFileBlock();
+      for (int i = 0; i < cloned.fileBlocksLength(); i++) {
+        it = cloned.fileBlocks(it, i);
+        for (int j = 0; j < it.replicaHostIdxsLength(); j++) {
+          int origHostIdx = FileBlock.getReplicaHostIdx(it, j);
+          boolean isCached = FileBlock.isReplicaCached(it, j);
+          TNetworkAddress origHost = origIndex.get(origHostIdx);
+          int newHostIdx = dstIndex.getIndex(origHost);
+          it.mutateReplicaHostIdxs(j, FileBlock.makeReplicaIdx(isCached, newHostIdx));
+        }
+      }
+      return new FileDescriptor(cloned);
+    }
+
+    /**
      * Creates the file descriptor of a file represented by 'fileStatus' with blocks
      * stored in 'blockLocations'. 'fileSystem' is the filesystem where the
      * file resides and 'hostIndex' stores the network addresses of the hosts that store
@@ -316,8 +344,7 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
         TNetworkAddress networkAddress = BlockReplica.parseLocation(loc.getNames()[i]);
         short replicaIdx = (short) hostIndex.getIndex(networkAddress);
         boolean isReplicaCached = cachedHosts.contains(loc.getHosts()[i]);
-        replicaIdx = isReplicaCached ?
-            (short) (replicaIdx | ~REPLICA_HOST_IDX_MASK) : replicaIdx;
+        replicaIdx = makeReplicaIdx(isReplicaCached, replicaIdx);
         fbb.addShort(replicaIdx);
       }
       int fbReplicaHostIdxOffset = fbb.endVector();
@@ -334,6 +361,13 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       return FbFileBlock.endFbFileBlock(fbb);
     }
 
+    private static short makeReplicaIdx(boolean isReplicaCached, int hostIdx) {
+      Preconditions.checkArgument((hostIdx & REPLICA_HOST_IDX_MASK) == hostIdx,
+          "invalid hostIdx: %s", hostIdx);
+      return isReplicaCached ? (short) (hostIdx | ~REPLICA_HOST_IDX_MASK)
+          : (short)hostIdx;
+    }
+
     /**
      * Constructs an FbFileBlock object from the file block metadata that comprise block's
      * 'offset', 'length' and replica index 'replicaIdx'. Serializes the file block

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 74f84e5..d4423d9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -67,9 +67,12 @@ import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
@@ -1682,6 +1685,59 @@ public class HdfsTable extends Table implements FeFsTable {
     return table;
   }
 
+  @Override
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    TGetPartialCatalogObjectResponse resp = super.getPartialInfo(req);
+
+    boolean wantPartitionInfo = req.table_info_selector.want_partition_files ||
+        req.table_info_selector.want_partition_metadata ||
+        req.table_info_selector.want_partition_names;
+
+    Collection<Long> partIds = req.table_info_selector.partition_ids;
+    if (partIds == null && wantPartitionInfo) {
+      // Caller specified at least one piece of partition info but didn't specify
+      // any partition IDs. That means they want the info for all partitions.
+      partIds = partitionMap_.keySet();
+    }
+
+    if (partIds != null) {
+      resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size());
+      for (long partId : partIds) {
+        HdfsPartition part = partitionMap_.get(partId);
+        Preconditions.checkArgument(part != null, "Partition id %s does not exist",
+            partId);
+        TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId);
+
+        if (req.table_info_selector.want_partition_names) {
+          partInfo.setName(part.getPartitionName());
+        }
+
+        if (req.table_info_selector.want_partition_metadata) {
+          partInfo.hms_partition = part.toHmsPartition();
+        }
+
+        if (req.table_info_selector.want_partition_files) {
+          List<FileDescriptor> fds = part.getFileDescriptors();
+          partInfo.file_descriptors = Lists.newArrayListWithCapacity(fds.size());
+          for (FileDescriptor fd: fds) {
+            partInfo.file_descriptors.add(fd.toThrift());
+          }
+        }
+
+        resp.table_info.partitions.add(partInfo);
+      }
+    }
+
+    if (req.table_info_selector.want_partition_files) {
+      // TODO(todd) we are sending the whole host index even if we returned only
+      // one file -- maybe not so efficient, but the alternative is to do a bunch
+      // of cloning of file descriptors which might increase memory pressure.
+      resp.table_info.setNetwork_addresses(hostIndex_.getList());
+    }
+    return resp;
+  }
+
   /**
    * Create a THdfsTable corresponding to this HdfsTable. If serializing the "FULL"
    * information, then then all partitions and THdfsFileDescs of each partition should be

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
index 0cf89ab..ba3e5cf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IncompleteTable.java
@@ -26,10 +26,13 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.JniUtil;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 
 /**
@@ -129,4 +132,11 @@ public class IncompleteTable extends Table {
       ImpalaException e) {
     return new IncompleteTable(db, name, e);
   }
+
+  @Override
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    Throwables.propagateIfPossible(cause_, TableLoadingException.class);
+    throw new TableLoadingException(cause_.getMessage());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 06c4500..d63551c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
@@ -39,8 +40,12 @@ import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TColumnDescriptor;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialTableInfo;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.log4j.Logger;
@@ -374,6 +379,49 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
   }
 
   /**
+   * Return partial info about this table. This is called only on the catalogd to
+   * service GetPartialCatalogObject RPCs.
+   */
+  public TGetPartialCatalogObjectResponse getPartialInfo(
+      TGetPartialCatalogObjectRequest req) throws TableLoadingException {
+    Preconditions.checkState(isLoaded(), "unloaded table: %s", getFullName());
+    TTableInfoSelector selector = Preconditions.checkNotNull(req.table_info_selector,
+        "no table_info_selector");
+
+    TGetPartialCatalogObjectResponse resp = new TGetPartialCatalogObjectResponse();
+    resp.setObject_version_number(getCatalogVersion());
+    resp.table_info = new TPartialTableInfo();
+    if (selector.want_hms_table) {
+      // TODO(todd): the deep copy could be a bit expensive. Unfortunately if we took
+      // a reference to this object, and let it escape out of the lock, it would be
+      // racy since the TTable is modified in place by some DDLs (eg 'dropTableStats').
+      // We either need to ensure that TTable is cloned on every write, or we need to
+      // ensure that serialization of the GetPartialCatalogObjectResponse object
+      // is done while we continue to hold the table lock.
+      resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
+    }
+    if (selector.want_stats_for_column_names != null) {
+      List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity(
+          selector.want_stats_for_column_names.size());
+      for (String colName: selector.want_stats_for_column_names) {
+        Column col = getColumn(colName);
+        if (col == null) continue;
+        // Ugly hack: if the catalogd has never gotten any stats from HMS, numDVs will
+        // be -1, and we'll have to send no stats to the impalad.
+        if (!col.getStats().hasNumDistinctValues()) continue;
+
+        ColumnStatisticsData tstats = col.getStats().toHmsCompatibleThrift(col.getType());
+        if (tstats == null) continue;
+        // TODO(todd): it seems like the column type is not used? maybe worth not
+        // setting it here to save serialization.
+        statsList.add(new ColumnStatisticsObj(colName, col.getType().toString(), tstats));
+      }
+      resp.table_info.setColumn_stats(statsList);
+    }
+
+    return resp;
+  }
+  /**
    * @see FeCatalogUtils#parseColumnType(FieldSchema, String)
    */
   protected Type parseColumnType(FieldSchema fs) throws TableLoadingException {

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
new file mode 100644
index 0000000..ef71e63
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.common.Pair;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TCatalogInfoSelector;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
+import org.apache.impala.thrift.TErrorCode;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.errorprone.annotations.Immutable;
+
+/**
+ * MetaProvider which fetches metadata in a granular fashion from the catalogd.
+ */
+public class CatalogdMetaProvider implements MetaProvider {
+
+  // TODO(todd): currently we haven't implemented catalogd thrift APIs for all pieces
+  // of metadata. In order to incrementally build this out, we delegate various calls
+  // to the "direct" provider for now and circumvent catalogd.
+  private DirectMetaProvider directProvider_ = new DirectMetaProvider();
+
+  /**
+   * Send a GetPartialCatalogObject request to catalogd. This handles converting
+   * non-OK status responses back to exceptions, performing various generic sanity
+   * checks, etc.
+   */
+  private TGetPartialCatalogObjectResponse sendRequest(
+      TGetPartialCatalogObjectRequest req)
+      throws TException {
+    TGetPartialCatalogObjectResponse resp;
+    byte[] ret;
+    try {
+      ret = FeSupport.GetPartialCatalogObject(new TSerializer().serialize(req));
+    } catch (InternalException e) {
+      throw new TException(e);
+    }
+    resp = new TGetPartialCatalogObjectResponse();
+    new TDeserializer().deserialize(resp, ret);
+    if (resp.status.status_code != TErrorCode.OK) {
+      // TODO(todd) do reasonable error handling
+      throw new TException(resp.toString());
+    }
+
+    // If we requested information about a particular version of an object, but
+    // got back a response for a different version, then we have a case of "read skew".
+    // For example, we may have fetched the partition list of a table, performed pruning,
+    // and then tried to fetch the specific partitions needed for a query, while some
+    // concurrent DDL modified the set of partitions. This could result in an unexpected
+    // result which violates the snapshot consistency guarantees expected by users.
+    if (req.object_desc.isSetCatalog_version() &&
+        resp.isSetObject_version_number() &&
+        req.object_desc.catalog_version != resp.object_version_number) {
+      throw new InconsistentMetadataFetchException(String.format(
+          "Catalog object %s changed version from %s to %s while fetching metadata",
+          req.object_desc.toString(),
+          req.object_desc.catalog_version,
+          resp.object_version_number));
+    }
+    return resp;
+  }
+
+  @Override
+  public ImmutableList<String> loadDbList() throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForCatalog();
+    req.catalog_info_selector.want_db_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.catalog_info != null && resp.catalog_info.db_names != null, req,
+        "missing table names");
+    return ImmutableList.copyOf(resp.catalog_info.db_names);
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForCatalog() {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.CATALOG);
+    req.catalog_info_selector = new TCatalogInfoSelector();
+    return req;
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForDb(String dbName) {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.DATABASE);
+    req.object_desc.db = new TDatabase(dbName);
+    req.db_info_selector = new TDbInfoSelector();
+    return req;
+  }
+
+  @Override
+  public Database loadDb(String dbName) throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForDb(dbName);
+    req.db_info_selector.want_hms_database = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.db_info != null && resp.db_info.hms_database != null,
+        req, "missing expected HMS database");
+    return resp.db_info.hms_database;
+  }
+
+  @Override
+  public ImmutableList<String> loadTableNames(String dbName)
+      throws MetaException, UnknownDBException, TException {
+    // TODO(todd): do we ever need to fetch the DB without the table names
+    // or vice versa?
+    TGetPartialCatalogObjectRequest req = newReqForDb(dbName);
+    req.db_info_selector.want_table_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.db_info != null && resp.db_info.table_names != null,
+        req, "missing expected HMS table");
+    return ImmutableList.copyOf(resp.db_info.table_names);
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForTable(String dbName,
+      String tableName) {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable(dbName, tableName);
+    req.table_info_selector = new TTableInfoSelector();
+    return req;
+  }
+
+  private TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl,
+        "table ref %s was not created by CatalogdMetaProvider", table);
+    TGetPartialCatalogObjectRequest req = newReqForTable(
+        ((TableMetaRefImpl)table).dbName_,
+        ((TableMetaRefImpl)table).tableName_);
+    req.object_desc.setCatalog_version(((TableMetaRefImpl)table).catalogVersion_);
+    return req;
+  }
+
+  @Override
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
+      throws NoSuchObjectException, MetaException, TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(dbName, tableName);
+    req.table_info_selector.want_hms_table = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.hms_table != null,
+        req, "missing expected HMS table");
+    TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, resp.table_info.hms_table,
+        resp.object_version_number);
+    return Pair.create(resp.table_info.hms_table, ref);
+  }
+
+  @Override
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+      List<String> colNames) throws TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.want_stats_for_column_names = colNames;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.column_stats != null,
+        req, "missing column stats");
+    return resp.table_info.column_stats;
+  }
+
+  @Override
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
+      throws MetaException, TException {
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.want_partition_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.partitions != null,
+        req, "missing partition list result");
+    List<PartitionRef> ret = Lists.newArrayListWithCapacity(
+        resp.table_info.partitions.size());
+    for (TPartialPartitionInfo p : resp.table_info.partitions) {
+      checkResponse(p.isSetId(), req, "response missing partition IDs for partition %s",
+          p);
+      ret.add(new PartitionRefImpl(p));
+    }
+    return ret;
+  }
+
+  @Override
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
+      List<String> partitionColumnNames,
+      ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs)
+      throws MetaException, TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
+
+    List<Long> ids = Lists.newArrayListWithExpectedSize(partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      ids.add(((PartitionRefImpl)ref).getId());
+    }
+
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.partition_ids = ids;
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.want_partition_files = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    checkResponse(resp.table_info != null && resp.table_info.partitions != null,
+        req, "missing partition list result");
+    checkResponse(resp.table_info.network_addresses != null,
+        req, "missing network addresses");
+    checkResponse(resp.table_info.partitions.size() == ids.size(),
+        req, "returned %d partitions instead of expected %d",
+        resp.table_info.partitions.size(), ids.size());
+
+    Map<String, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(ids.size());
+    for (TPartialPartitionInfo part: resp.table_info.partitions) {
+      Partition msPart = part.getHms_partition();
+      if (msPart == null) {
+        checkResponse(refImpl.msTable_.getPartitionKeysSize() == 0, req,
+            "Should not return a partition with missing HMS partition unless " +
+            "the table is unpartitioned");
+        msPart = DirectMetaProvider.msTableToPartition(refImpl.msTable_);
+      }
+      checkResponse(msPart != null && msPart.getValues() != null, req,
+          "malformed partition result: %s", part.toString());
+      String partName = FileUtils.makePartName(partitionColumnNames, msPart.getValues());
+
+      checkResponse(part.file_descriptors != null, req, "missing file descriptors");
+      List<FileDescriptor> fds = Lists.newArrayListWithCapacity(
+          part.file_descriptors.size());
+      for (THdfsFileDesc thriftFd: part.file_descriptors) {
+        FileDescriptor fd = FileDescriptor.fromThrift(thriftFd);
+        // The file descriptors returned via the RPC use host indexes that reference
+        // the 'network_addresses' list in the RPC. However, the caller may have already
+        // loaded some addresses into 'hostIndex'. So, the returned FDs need to be
+        // remapped to point to the caller's 'hostIndex' instead of the list in the
+        // RPC response.
+        fds.add(fd.cloneWithNewHostIndex(resp.table_info.network_addresses, hostIndex));
+      }
+      PartitionMetadataImpl metaImpl = new PartitionMetadataImpl(msPart,
+          ImmutableList.copyOf(fds));
+      PartitionMetadata oldVal = ret.put(partName, metaImpl);
+      if (oldVal != null) {
+        throw new RuntimeException("catalogd returned partition " + partName +
+            " multiple times");
+      }
+    }
+    return ret;
+  }
+
+  private static void checkResponse(boolean condition,
+      TGetPartialCatalogObjectRequest req, String msg, Object... args) throws TException {
+    if (condition) return;
+    throw new TException(String.format("Invalid response from catalogd for request " +
+        req.toString() + ": " + msg, args));
+  }
+
+  @Override
+  public String loadNullPartitionKeyValue() throws MetaException, TException {
+    return directProvider_.loadNullPartitionKeyValue();
+  }
+
+  @Override
+  public List<String> loadFunctionNames(String dbName) throws MetaException, TException {
+    return directProvider_.loadFunctionNames(dbName);
+  }
+
+  @Override
+  public Function getFunction(String dbName, String functionName)
+      throws MetaException, TException {
+    return directProvider_.getFunction(dbName, functionName);
+  }
+
+  /**
+   * Reference to a partition within a table. We remember the partition's ID and pass
+   * that back to the catalog in subsequent requests back to fetch the details of the
+   * partition, since the ID is smaller than the name and provides a unique (not-reused)
+   * identifier.
+   */
+  @Immutable
+  private static class PartitionRefImpl implements PartitionRef {
+    @SuppressWarnings("Immutable") // Thrift objects are mutable, but we won't mutate it.
+    private final TPartialPartitionInfo info_;
+
+    public PartitionRefImpl(TPartialPartitionInfo p) {
+      this.info_ = p;
+    }
+
+    @Override
+    public String getName() {
+      return info_.getName();
+    }
+
+    private long getId() {
+      return info_.id;
+    }
+  }
+
+  public static class PartitionMetadataImpl implements PartitionMetadata {
+    private final Partition msPartition_;
+    private final ImmutableList<FileDescriptor> fds_;
+
+    public PartitionMetadataImpl(Partition msPartition,
+        ImmutableList<FileDescriptor> fds) {
+      this.msPartition_ = msPartition;
+      this.fds_ = fds;
+    }
+
+    @Override
+    public Partition getHmsPartition() {
+      return msPartition_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getFileDescriptors() {
+      return fds_;
+    }
+  }
+
+  /**
+   * A reference to a table that has been looked up, allowing callers to fetch further
+   * detailed information. This is is more extensive than just the table name so that
+   * we can provide a consistency check that the catalog version doesn't change in
+   * between calls.
+   */
+  private static class TableMetaRefImpl implements TableMetaRef {
+    private final String dbName_;
+    private final String tableName_;
+
+    /**
+     * Stash the HMS Table object since we need this in order to handle some strange
+     * behavior whereby the catalogd returns a Partition with no HMS partition object
+     * in the case of unpartitioned tables.
+     */
+    private final Table msTable_;
+
+    /**
+     * The version of the table when we first loaded it. Subsequent requests about
+     * the table are verified against this version.
+     */
+    private final long catalogVersion_;
+
+    public TableMetaRefImpl(String dbName, String tableName,
+        Table msTable, long catalogVersion) {
+      this.dbName_ = dbName;
+      this.tableName_ = tableName;
+      this.msTable_ = msTable;
+      this.catalogVersion_ = catalogVersion;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 6ac7187..c1e3675 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -17,7 +17,10 @@
 
 package org.apache.impala.catalog.local;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,17 +39,25 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TBackendGflags;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.errorprone.annotations.Immutable;
 
 /**
  * Metadata provider which calls out directly to the source systems
@@ -93,11 +104,14 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public Table loadTable(String dbName, String tableName)
+  public Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
       throws MetaException, NoSuchObjectException, TException {
+    Table msTable;
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().getTable(dbName, tableName);
+      msTable = c.getHiveClient().getTable(dbName, tableName);
     }
+    TableMetaRef ref = new TableMetaRefImpl(dbName, tableName, msTable);
+    return Pair.create(msTable, ref);
   }
 
   @Override
@@ -108,39 +122,71 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<String> loadPartitionNames(String dbName, String tableName)
+  public List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
+    TableMetaRefImpl ref = (TableMetaRefImpl)table;
+
+    // If the table isn't partitioned, just return a single partition with no name.
+    // In loadPartitionsByRefs() below, we'll detect this case and load the special
+    // unpartitioned table.
+    if (!ref.isPartitioned()) {
+      return ImmutableList.of((PartitionRef)new PartitionRefImpl(
+          PartitionRefImpl.UNPARTITIONED_NAME));
+    }
+
+    List<String> partNames;
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().listPartitionNames(dbName, tableName,
-          /*max_parts=*/(short)-1);
+      partNames = c.getHiveClient().listPartitionNames(
+          ref.dbName_, ref.tableName_, /*max_parts=*/(short)-1);
+    }
+    List<PartitionRef> partRefs = Lists.newArrayListWithCapacity(partNames.size());
+    for (String name: partNames) {
+      partRefs.add(new PartitionRefImpl(name));
     }
+    return partRefs;
   }
 
   @Override
-  public Map<String, Partition> loadPartitionsByNames(
-      String dbName, String tableName, List<String> partitionColumnNames,
-      List<String> partitionNames) throws MetaException, TException {
-    Preconditions.checkNotNull(dbName);
-    Preconditions.checkNotNull(tableName);
+  public Map<String, PartitionMetadata> loadPartitionsByRefs(
+      TableMetaRef table, List<String> partitionColumnNames,
+      ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs) throws MetaException, TException {
+    Preconditions.checkNotNull(table);
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     Preconditions.checkArgument(!partitionColumnNames.isEmpty());
-    Preconditions.checkNotNull(partitionNames);
+    Preconditions.checkNotNull(partitionRefs);
+
+    TableMetaRefImpl tableImpl = (TableMetaRefImpl)table;
+
+    String fullTableName = tableImpl.dbName_ + "." + tableImpl.tableName_;
 
-    Map<String, Partition> ret = Maps.newHashMapWithExpectedSize(
-        partitionNames.size());
-    if (partitionNames.isEmpty()) return ret;
+    if (!((TableMetaRefImpl)table).isPartitioned()) {
+      return loadUnpartitionedPartition((TableMetaRefImpl)table, partitionRefs,
+          hostIndex);
+    }
+
+    Map<String, PartitionMetadata> ret = Maps.newHashMapWithExpectedSize(
+        partitionRefs.size());
+    if (partitionRefs.isEmpty()) return ret;
 
     // Fetch the partitions.
+    List<String> partNames = Lists.newArrayListWithCapacity(partitionRefs.size());
+    for (PartitionRef ref: partitionRefs) {
+      partNames.add(ref.getName());
+    }
+
     List<Partition> parts;
     try (MetaStoreClient c = msClientPool_.getClient()) {
       parts = MetaStoreUtil.fetchPartitionsByName(
-          c.getHiveClient(), partitionNames, dbName, tableName);
+          c.getHiveClient(), partNames, tableImpl.dbName_, tableImpl.tableName_);
     }
 
     // HMS may return fewer partition objects than requested, and the
     // returned partition objects don't carry enough information to get their
     // names. So, we map the returned partitions back to the requested names
     // using the passed-in partition column names.
-    Set<String> namesSet = ImmutableSet.copyOf(partitionNames);
+    Set<String> namesSet = ImmutableSet.copyOf(partNames);
     for (Partition p: parts) {
       List<String> vals = p.getValues();
       if (vals.size() != partitionColumnNames.size()) {
@@ -152,16 +198,50 @@ class DirectMetaProvider implements MetaProvider {
         throw new MetaException("HMS returned unexpected partition " + partName +
             " which was not requested. Requested: " + namesSet);
       }
-      Partition existing = ret.put(partName, p);
+
+      ImmutableList<FileDescriptor> fds = loadFileMetadata(
+          fullTableName, partName, p, hostIndex);
+
+      PartitionMetadata existing = ret.put(partName, new PartitionMetadataImpl(p, fds));
       if (existing != null) {
         throw new MetaException("HMS returned multiple partitions with name " +
             partName);
       }
     }
 
+
     return ret;
   }
 
+  /**
+   * We model partitions slightly differently to Hive. So, in the case of an
+   * unpartitioned table, we have to create a fake Partition object which has the
+   * metadata of the table.
+   */
+  private Map<String, PartitionMetadata> loadUnpartitionedPartition(
+      TableMetaRefImpl table, List<PartitionRef> partitionRefs,
+      ListMap<TNetworkAddress> hostIndex) {
+    Preconditions.checkArgument(partitionRefs.size() == 1,
+        "Expected exactly one partition to load for unpartitioned table");
+    PartitionRef ref = partitionRefs.get(0);
+    Preconditions.checkArgument(ref.getName().isEmpty(),
+        "Expected empty partition name for unpartitioned table");
+    Partition msPartition = msTableToPartition(table.msTable_);
+    String fullName = table.dbName_ + "." + table.tableName_;
+    ImmutableList<FileDescriptor> fds = loadFileMetadata(fullName,
+        "default",  msPartition, hostIndex);
+    return ImmutableMap.of("", (PartitionMetadata)new PartitionMetadataImpl(
+        msPartition, fds));
+  }
+
+  static Partition msTableToPartition(Table msTable) {
+    Partition msp = new Partition();
+    msp.setSd(msTable.getSd());
+    msp.setParameters(msTable.getParameters());
+    msp.setValues(Collections.<String>emptyList());
+    return msp;
+  }
+
   @Override
   public List<String> loadFunctionNames(String dbName) throws TException {
     Preconditions.checkNotNull(dbName);
@@ -183,22 +263,125 @@ class DirectMetaProvider implements MetaProvider {
   }
 
   @Override
-  public List<ColumnStatisticsObj> loadTableColumnStatistics(String dbName,
-      String tblName, List<String> colNames) throws TException {
+  public List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+      List<String> colNames) throws TException {
+    Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     try (MetaStoreClient c = msClientPool_.getClient()) {
-      return c.getHiveClient().getTableColumnStatistics(dbName, tblName, colNames);
+      return c.getHiveClient().getTableColumnStatistics(
+          ((TableMetaRefImpl)table).dbName_,
+          ((TableMetaRefImpl)table).tableName_,
+          colNames);
     }
   }
 
-  @Override
-  public List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException {
-    Preconditions.checkNotNull(dir);
-    Preconditions.checkArgument(dir.isAbsolute(),
-        "Must pass absolute path: %s", dir);
-    FileSystem fs = dir.getFileSystem(CONF);
-    RemoteIterator<LocatedFileStatus> it = fs.listFiles(dir, /*recursive=*/false);
-    ImmutableList.Builder<LocatedFileStatus> b = new ImmutableList.Builder<>();
-    while (it.hasNext()) b.add(it.next());
-    return b.build();
+  private ImmutableList<FileDescriptor> loadFileMetadata(String fullTableName,
+      String partName, Partition msPartition, ListMap<TNetworkAddress> hostIndex) {
+    Path partDir = new Path(msPartition.getSd().getLocation());
+
+    List<LocatedFileStatus> stats = Lists.newArrayList();
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      RemoteIterator<LocatedFileStatus> it = fs.listFiles(partDir, /*recursive=*/false);
+      while (it.hasNext()) stats.add(it.next());
+    } catch (FileNotFoundException fnf) {
+      // If the partition directory isn't found, this is treated as having no
+      // files.
+      return ImmutableList.of();
+    } catch (IOException ioe) {
+      throw new LocalCatalogException(String.format(
+          "Could not load files for partition %s of table %s",
+          partName, fullTableName), ioe);
+    }
+
+    HdfsTable.FileMetadataLoadStats loadStats =
+        new HdfsTable.FileMetadataLoadStats(partDir);
+
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      return ImmutableList.copyOf(
+          HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats),
+              hostIndex, loadStats));
+    } catch (IOException e) {
+        throw new LocalCatalogException(String.format(
+            "Could not convert files to descriptors for partition %s of table %s",
+            partName, fullTableName), e);
+    }
+  }
+
+  @Immutable
+  private static class PartitionRefImpl implements PartitionRef {
+    private static final String UNPARTITIONED_NAME = "";
+    private final String name_;
+
+    public PartitionRefImpl(String name) {
+      this.name_ = name;
+    }
+
+    @Override
+    public String getName() {
+      return name_;
+    }
+  }
+
+  private static class PartitionMetadataImpl implements PartitionMetadata {
+    private final Partition msPartition_;
+    private final ImmutableList<FileDescriptor> fds_;
+
+    public PartitionMetadataImpl(Partition msPartition,
+        ImmutableList<FileDescriptor> fds) {
+      this.msPartition_ = msPartition;
+      this.fds_ = fds;
+    }
+
+    @Override
+    public Partition getHmsPartition() {
+      return msPartition_;
+    }
+
+    @Override
+    public ImmutableList<FileDescriptor> getFileDescriptors() {
+      return fds_;
+    }
+  }
+
+  private class TableMetaRefImpl implements TableMetaRef {
+
+    private final String dbName_;
+    private final String tableName_;
+    private final Table msTable_;
+
+    public TableMetaRefImpl(String dbName, String tableName, Table msTable) {
+      this.dbName_ = dbName;
+      this.tableName_ = tableName;
+      this.msTable_ = msTable;
+    }
+
+    private boolean isPartitioned() {
+      return msTable_.getPartitionKeysSize() != 0;
+    }
+  }
+
+
+  /**
+   * Wrapper for a normal Iterable<T> to appear like a Hadoop RemoteIterator<T>.
+   * This is necessary because the existing code to convert file statuses to
+   * descriptors consumes the remote iterator directly and thus avoids materializing
+   * all of the LocatedFileStatus objects in memory at the same time.
+   */
+  private static class FakeRemoteIterator<T> implements RemoteIterator<T> {
+    private final Iterator<T> it_;
+
+    FakeRemoteIterator(Iterable<T> it) {
+      this.it_ = it.iterator();
+    }
+    @Override
+    public boolean hasNext() throws IOException {
+      return it_.hasNext();
+    }
+
+    @Override
+    public T next() throws IOException {
+      return it_.next();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
new file mode 100644
index 0000000..9147cec
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/local/InconsistentMetadataFetchException.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.local;
+
+/**
+ * If this is thrown, it indicates that the catalog implementation in the Impalad
+ * has identified that the metadata it read was not a proper snapshot of the source
+ * metadata. In other words, the resulting plan may be incorrect and so the plan
+ * in progress should be discarded and retried. It should be assumed that, if this
+ * exception is thrown, the catalog has already taken appropriate steps to ensure that
+ * a retry will not encounter the same inconsistency.
+ *
+ * Note that the above does not guarantee that a retry will succeed, only that it will
+ * not encounter the _same_ conflict.
+ */
+public class InconsistentMetadataFetchException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public InconsistentMetadataFetchException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index 37b25b4..8d57210 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -72,7 +72,7 @@ public class LocalCatalog implements FeCatalog {
   private final String defaultKuduMasterHosts_;
 
   public static LocalCatalog create(String defaultKuduMasterHosts) {
-    return new LocalCatalog(new DirectMetaProvider(), defaultKuduMasterHosts);
+    return new LocalCatalog(new CatalogdMetaProvider(), defaultKuduMasterHosts);
   }
 
   private LocalCatalog(MetaProvider metaProvider, String defaultKuduMasterHosts) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 2e35ce8..9cf2d8b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -17,17 +17,13 @@
 
 package org.apache.impala.catalog.local;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -38,7 +34,6 @@ import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PartitionStatsUtil;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
@@ -49,17 +44,21 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 public class LocalFsPartition implements FeFsPartition {
-  private static final Configuration CONF = new Configuration();
   private final LocalFsTable table_;
   private final LocalPartitionSpec spec_;
   private final Partition msPartition_;
-  private ImmutableList<FileDescriptor> fileDescriptors_;
+  /**
+   * Null in the case of a 'prototype partition'.
+   */
+  @Nullable
+  private final ImmutableList<FileDescriptor> fileDescriptors_;
 
   public LocalFsPartition(LocalFsTable table, LocalPartitionSpec spec,
-      Partition msPartition) {
+      Partition msPartition, ImmutableList<FileDescriptor> fileDescriptors) {
     table_ = Preconditions.checkNotNull(table);
     spec_ = Preconditions.checkNotNull(spec);
     msPartition_ = Preconditions.checkNotNull(msPartition);
+    fileDescriptors_ = fileDescriptors;
   }
 
   @Override
@@ -79,19 +78,16 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public List<FileDescriptor> getFileDescriptors() {
-    loadFileDescriptors();
     return fileDescriptors_;
   }
 
   @Override
   public boolean hasFileDescriptors() {
-    loadFileDescriptors();
     return !fileDescriptors_.isEmpty();
   }
 
   @Override
   public int getNumFileDescriptors() {
-    loadFileDescriptors();
     return fileDescriptors_.size();
   }
 
@@ -172,7 +168,6 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public long getSize() {
-    loadFileDescriptors();
     long size = 0;
     for (FileDescriptor fd : fileDescriptors_) {
       size += fd.getFileLength();
@@ -218,60 +213,4 @@ public class LocalFsPartition implements FeFsPartition {
     return Maps.filterKeys(getParameters(),
         HdfsPartition.IS_NOT_INCREMENTAL_STATS_KEY);
   }
-
-
-  private void loadFileDescriptors() {
-    if (fileDescriptors_ != null) return;
-    Path partDir = getLocationPath();
-    List<LocatedFileStatus> stats;
-    try {
-      stats = table_.db_.getCatalog().getMetaProvider().loadFileMetadata(partDir);
-    } catch (FileNotFoundException fnf) {
-      // If the partition directory isn't found, this is treated as having no
-      // files.
-      fileDescriptors_ = ImmutableList.of();
-      return;
-    } catch (IOException ioe) {
-      throw new LocalCatalogException(String.format(
-          "Could not load files for partition %s of table %s",
-          spec_.getName(), table_.getFullName()), ioe);
-    }
-
-    HdfsTable.FileMetadataLoadStats loadStats =
-        new HdfsTable.FileMetadataLoadStats(partDir);
-
-    try {
-      FileSystem fs = partDir.getFileSystem(CONF);
-      fileDescriptors_ = ImmutableList.copyOf(
-          HdfsTable.createFileDescriptors(fs, new FakeRemoteIterator<>(stats),
-              table_.getHostIndex(), loadStats));
-    } catch (IOException e) {
-        throw new LocalCatalogException(String.format(
-            "Could not convert files to descriptors for partition %s of table %s",
-            spec_.getName(), table_.getFullName()), e);
-    }
-  }
-
-  /**
-   * Wrapper for a normal Iterable<T> to appear like a Hadoop RemoteIterator<T>.
-   * This is necessary because the existing code to convert file statuses to
-   * descriptors consumes the remote iterator directly and thus avoids materializing
-   * all of the LocatedFileStatus objects in memory at the same time.
-   */
-  private static class FakeRemoteIterator<T> implements RemoteIterator<T> {
-    private final Iterator<T> it_;
-
-    FakeRemoteIterator(Iterable<T> it) {
-      this.it_ = it.iterator();
-    }
-    @Override
-    public boolean hasNext() throws IOException {
-      return it_.hasNext();
-    }
-
-    @Override
-    public T next() throws IOException {
-      return it_.next();
-    }
-  }
 }


[2/4] impala git commit: IMPALA-7436: initial fetch-from-catalogd implementation

Posted by jo...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index f19adfd..da784d6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -44,6 +44,9 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
@@ -108,7 +111,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private final String avroSchema_;
 
-  public static LocalFsTable load(LocalDb db, Table msTbl) {
+  public static LocalFsTable load(LocalDb db, Table msTbl, TableMetaRef ref) {
     String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
 
     // Set Avro schema if necessary.
@@ -141,16 +144,16 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         cmap = ColumnMap.fromMsTable(msTbl);
       }
 
-      return new LocalFsTable(db, msTbl, cmap, avroSchema);
+      return new LocalFsTable(db, msTbl, ref, cmap, avroSchema);
     } catch (AnalysisException e) {
       throw new LocalCatalogException("Failed to load Avro schema for table "
           + fullName);
     }
   }
 
-  private LocalFsTable(LocalDb db, Table msTbl, ColumnMap cmap,
+  private LocalFsTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cmap,
       String explicitAvroSchema) {
-    super(db, msTbl, cmap);
+    super(db, msTbl, ref, cmap);
 
     // set NULL indicator string from table properties
     String tableNullFormat =
@@ -178,11 +181,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   public static LocalFsTable createCtasTarget(LocalDb db,
       Table msTbl) throws CatalogException {
-    // TODO(todd): set a member variable indicating this is a CTAS target
-    // so we can checkState() against it in various other methods and make
-    // sure we don't try to do something like load partitions for a not-yet-created
-    // table.
-    return new LocalFsTable(db, msTbl, ColumnMap.fromMsTable(msTbl),
+    return new LocalFsTable(db, msTbl, /*ref=*/null, ColumnMap.fromMsTable(msTbl),
         /*explicitAvroSchema=*/null);
   }
 
@@ -240,7 +239,13 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   public Set<HdfsFileFormat> getFileFormats() {
     // TODO(todd): can we avoid loading all partitions here? this is called
     // for any INSERT query, even if the partition is specified.
-    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
+    Collection<? extends FeFsPartition> parts;
+    if (ref_ != null) {
+      parts = FeCatalogUtils.loadAllPartitions(this);
+    } else {
+      // If this is a CTAS target, we don't want to try to load the partition list.
+      parts = Collections.emptyList();
+    }
     // In the case that we have no partitions added to the table yet, it's
     // important to add the "prototype" partition as a fallback.
     Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
@@ -329,9 +334,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
     protoMsPartition.setParameters(Collections.<String, String>emptyMap());
     LocalPartitionSpec spec = new LocalPartitionSpec(
-        this, "", CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
+        this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
     LocalFsPartition prototypePartition = new LocalFsPartition(
-        this, spec, protoMsPartition);
+        this, spec, protoMsPartition, /*fileDescriptors=*/null);
     return prototypePartition;
   }
 
@@ -374,29 +379,17 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     // Possible in the case that all partitions were pruned.
     if (ids.isEmpty()) return Collections.emptyList();
 
-    List<String> names = Lists.newArrayList();
+    List<PartitionRef> refs = Lists.newArrayList();
     for (Long id : ids) {
       LocalPartitionSpec spec = partitionSpecs_.get(id);
       Preconditions.checkArgument(spec != null, "Invalid partition ID for table %s: %s",
           getFullName(), id);
-      String name = spec.getName();
-      if (name.isEmpty()) {
-        // Unpartitioned tables don't need to fetch partitions from the metadata
-        // provider. Rather, we just create a partition on the fly.
-        Preconditions.checkState(getNumClusteringCols() == 0,
-            "Cannot fetch empty partition name from a partitioned table");
-        Preconditions.checkArgument(ids.size() == 1,
-            "Expected to only fetch one partition for unpartitioned table %s",
-            getFullName());
-        return Lists.newArrayList(createUnpartitionedPartition(spec));
-      } else {
-        names.add(name);
-      }
+      refs.add(Preconditions.checkNotNull(spec.getRef()));
     }
-    Map<String, Partition> partsByName;
+    Map<String, PartitionMetadata> partsByName;
     try {
-      partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByNames(
-          db_.getName(), name_, getClusteringColumnNames(), names);
+      partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByRefs(
+          ref_, getClusteringColumnNames(), hostIndex_, refs);
     } catch (TException e) {
       throw new LocalCatalogException(
           "Could not load partitions for table " + getFullName(), e);
@@ -404,16 +397,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     List<FeFsPartition> ret = Lists.newArrayListWithCapacity(ids.size());
     for (Long id : ids) {
       LocalPartitionSpec spec = partitionSpecs_.get(id);
-      Partition p = partsByName.get(spec.getName());
+      PartitionMetadata p = partsByName.get(spec.getRef().getName());
       if (p == null) {
         // TODO(todd): concurrent drop partition could result in this error.
         // Should we recover in a more graceful way from such an unexpected event?
         throw new LocalCatalogException(
             "Could not load expected partitions for table " + getFullName() +
-            ": missing expected partition with name '" + spec.getName() +
+            ": missing expected partition with name '" + spec.getRef().getName() +
             "' (perhaps it was concurrently dropped by another process)");
       }
-      ret.add(new LocalFsPartition(this, spec, p));
+
+      LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
+          p.getFileDescriptors());
+      ret.add(part);
     }
     return ret;
   }
@@ -426,23 +422,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     return names;
   }
 
-  /**
-   * Create a partition which represents the main partition of an unpartitioned
-   * table.
-   */
-  private LocalFsPartition createUnpartitionedPartition(LocalPartitionSpec spec) {
-    Preconditions.checkArgument(spec.getName().isEmpty());
-    Partition msp = new Partition();
-    msp.setSd(getMetaStoreTable().getSd());
-    msp.setParameters(getMetaStoreTable().getParameters());
-    msp.setValues(Collections.<String>emptyList());
-    return new LocalFsPartition(this, spec, msp);
-  }
-
-  private LocalPartitionSpec createUnpartitionedPartitionSpec() {
-    return new LocalPartitionSpec(this, "", /*id=*/0);
-  }
-
   private void loadPartitionValueMap() {
     if (partitionValueMap_ != null) return;
 
@@ -478,28 +457,23 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   private void loadPartitionSpecs() {
     if (partitionSpecs_ != null) return;
-
-    if (getNumClusteringCols() == 0) {
-      // Unpartitioned table.
-      // This table has no partition key, which means it has no declared partitions.
-      // We model partitions slightly differently to Hive - every file must exist in a
-      // partition, so add a single partition with no keys which will get all the
-      // files in the table's root directory.
-      partitionSpecs_ = ImmutableMap.of(0L, createUnpartitionedPartitionSpec());
+    if (ref_ == null) {
+      // This is a CTAS target. Don't try to load metadata.
+      partitionSpecs_ = ImmutableMap.of();
       return;
     }
-    List<String> partNames;
+
+    List<PartitionRef> partList;
     try {
-      partNames = db_.getCatalog().getMetaProvider().loadPartitionNames(
-          db_.getName(), name_);
+      partList = db_.getCatalog().getMetaProvider().loadPartitionList(ref_);
     } catch (TException e) {
       throw new LocalCatalogException("Could not load partition names for table " +
           getFullName(), e);
     }
     ImmutableMap.Builder<Long, LocalPartitionSpec> b = new ImmutableMap.Builder<>();
     long id = 0;
-    for (String partName : partNames) {
-      b.put(id, new LocalPartitionSpec(this, partName, id));
+    for (PartitionRef part: partList) {
+      b.put(id, new LocalPartitionSpec(this, part, id));
       id++;
     }
     partitionSpecs_ = b.build();

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
index 4ad2c14..8480500 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
@@ -44,19 +45,20 @@ public class LocalHbaseTable extends LocalTable implements FeHBaseTable {
   // TODO: revisit after caching is implemented for local catalog
   private HColumnDescriptor[] columnFamilies_ = null;
 
-  private LocalHbaseTable(LocalDb db, Table msTbl, ColumnMap cols) {
-    super(db, msTbl, cols);
+  private LocalHbaseTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cols) {
+    super(db, msTbl, ref, cols);
     hbaseTableName_ = Util.getHBaseTableName(msTbl);
   }
 
-  static LocalHbaseTable loadFromHbase(LocalDb db, Table msTable) {
+  static LocalHbaseTable loadFromHbase(LocalDb db, Table msTable, TableMetaRef ref) {
     try {
       // Warm up the connection and verify the table exists.
       Util.getHBaseTable(Util.getHBaseTableName(msTable)).close();
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col
-      return new LocalHbaseTable(db, msTable, new ColumnMap(Util.loadColumns(msTable), 1,
-          msTable.getDbName() + "." + msTable.getTableName()));
+      ColumnMap cmap = new ColumnMap(Util.loadColumns(msTable), 1,
+          msTable.getDbName() + "." + msTable.getTableName());
+      return new LocalHbaseTable(db, msTable, ref, cmap);
     } catch (IOException | MetaException | SerDeException e) {
       throw new LocalCatalogException(e);
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index 2095449..fc48ca1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -31,6 +31,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TKuduTable;
 import org.apache.impala.thrift.TTableDescriptor;
@@ -55,7 +56,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
    * Create a new instance based on the table metadata 'msTable' stored
    * in the metastore.
    */
-  static LocalTable loadFromKudu(LocalDb db, Table msTable) {
+  static LocalTable loadFromKudu(LocalDb db, Table msTable, TableMetaRef ref) {
     Preconditions.checkNotNull(db);
     Preconditions.checkNotNull(msTable);
     String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
@@ -82,7 +83,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
 
     ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
-    return new LocalKuduTable(db, msTable, cmap, pkNames, partitionBy);
+    return new LocalKuduTable(db, msTable, ref, cmap, pkNames, partitionBy);
   }
 
 
@@ -104,7 +105,9 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     }
 
     ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
-    return new LocalKuduTable(db, msTable, cmap, pkNames, kuduPartitionParams);
+
+    return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, pkNames,
+        kuduPartitionParams);
   }
 
   private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -128,10 +131,10 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     }
   }
 
-  private LocalKuduTable(LocalDb db, Table msTable, ColumnMap cmap,
+  private LocalKuduTable(LocalDb db, Table msTable, TableMetaRef ref, ColumnMap cmap,
       List<String> primaryKeyColumnNames,
       List<KuduPartitionParam> partitionBy)  {
-    super(db, msTable, cmap);
+    super(db, msTable, ref, cmap);
     tableParams_ = new TableParams(msTable);
     partitionBy_ = ImmutableList.copyOf(partitionBy);
     primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
index 690a4bf..c635158 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
@@ -19,11 +19,15 @@ package org.apache.impala.catalog.local;
 
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.util.MetaStoreUtil;
 
 import com.google.common.base.Preconditions;
@@ -36,31 +40,43 @@ import com.google.errorprone.annotations.Immutable;
  */
 @Immutable
 class LocalPartitionSpec implements PrunablePartition {
+  static final long UNPARTITIONED_ID = 0;
   private final long id_;
-  private final String name_;
+
+  @Nullable
+  private final PartitionRef ref_;
 
   // LiteralExprs are technically mutable prior to analysis.
   @SuppressWarnings("Immutable")
   private final ImmutableList<LiteralExpr> partitionValues_;
 
-  LocalPartitionSpec(LocalFsTable table, String partName, long id) {
+  LocalPartitionSpec(LocalFsTable table, PartitionRef ref, long id) {
     id_ = id;
-    name_ = Preconditions.checkNotNull(partName);
-    if (!partName.isEmpty()) {
-      try {
-        List<String> partValues = MetaStoreUtil.getPartValsFromName(
-            table.getMetaStoreTable(), partName);
-        partitionValues_ = ImmutableList.copyOf(FeCatalogUtils.parsePartitionKeyValues(
-            table, partValues));
-      } catch (CatalogException | MetaException e) {
-        throw new LocalCatalogException(String.format(
-            "Failed to parse partition name '%s' for table %s",
-            partName, table.getFullName()), e);
-      }
-    } else {
-      // Unpartitioned tables have a single partition with empty name.
-      partitionValues_= ImmutableList.of();
+    ref_ = Preconditions.checkNotNull(ref);
+    if (ref.getName().isEmpty()) {
+      // "unpartitioned" partition
+      partitionValues_ = ImmutableList.of();
+      return;
     }
+    try {
+      List<String> partValues = MetaStoreUtil.getPartValsFromName(
+          table.getMetaStoreTable(), ref_.getName());
+      partitionValues_ = ImmutableList.copyOf(FeCatalogUtils.parsePartitionKeyValues(
+          table, partValues));
+    } catch (CatalogException | MetaException e) {
+      throw new LocalCatalogException(String.format(
+          "Failed to parse partition name '%s' for table %s",
+          ref.getName(), table.getFullName()), e);
+    }
+  }
+
+  LocalPartitionSpec(LocalFsTable table, long id) {
+    // Unpartitioned tables have a single partition with empty name.
+    Preconditions.checkArgument(id == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID ||
+        id == UNPARTITIONED_ID);
+    this.id_ = id;
+    this.ref_ = null;
+    partitionValues_= ImmutableList.of();
   }
 
   @Override
@@ -69,5 +85,17 @@ class LocalPartitionSpec implements PrunablePartition {
   @Override
   public List<LiteralExpr> getPartitionValues() { return partitionValues_; }
 
-  String getName() { return name_; }
+  PartitionRef getRef() { return ref_; }
+
+  @Override
+  public String toString() {
+    if (ref_ != null) {
+      return ref_.getName();
+    } else if (id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
+      return "<prototype>";
+    } else {
+      return "<default>";
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 1a14831..81a0741 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -20,6 +20,8 @@ package org.apache.impala.catalog.local;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -36,6 +38,8 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.log4j.Logger;
@@ -66,22 +70,36 @@ abstract class LocalTable implements FeTable {
 
   private final TTableStats tableStats_;
 
+  /**
+   * Table reference as provided by the initial call to the metadata provider.
+   * This must be passed back to any further calls to the metadata provider
+   * in order to verify consistency.
+   *
+   * In the case of CTAS target tables, this may be null. Since the tables don't
+   * exist yet in any metadata storage, it would be invalid to try to load any metadata
+   * about them.
+   */
+  @Nullable
+  protected final TableMetaRef ref_;
+
   public static LocalTable load(LocalDb db, String tblName) {
     // In order to know which kind of table subclass to instantiate, we need
     // to eagerly grab and parse the top-level Table object from the HMS.
     LocalTable t = null;
-    Table msTbl = loadMsTable(db, tblName);
+    Pair<Table, TableMetaRef> tableMeta = loadTableMetadata(db, tblName);
+    Table msTbl = tableMeta.first;
+    TableMetaRef ref = tableMeta.second;
     if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) {
-      t = new LocalView(db, msTbl);
+      t = new LocalView(db, msTbl, ref);
     } else if (HBaseTable.isHBaseTable(msTbl)) {
-      t = LocalHbaseTable.loadFromHbase(db, msTbl);
+      t = LocalHbaseTable.loadFromHbase(db, msTbl, ref);
     } else if (KuduTable.isKuduTable(msTbl)) {
-      t = LocalKuduTable.loadFromKudu(db, msTbl);
+      t = LocalKuduTable.loadFromKudu(db, msTbl, ref);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
         msTbl.getSd().getInputFormat())) {
-      t = LocalFsTable.load(db, msTbl);
+      t = LocalFsTable.load(db, msTbl, ref);
     }
 
     if (t == null) {
@@ -101,7 +119,7 @@ abstract class LocalTable implements FeTable {
   /**
    * Load the Table instance from the metastore.
    */
-  private static Table loadMsTable(LocalDb db, String tblName) {
+  private static Pair<Table, TableMetaRef> loadTableMetadata(LocalDb db, String tblName) {
     Preconditions.checkArgument(tblName.toLowerCase().equals(tblName));
 
     try {
@@ -113,11 +131,11 @@ abstract class LocalTable implements FeTable {
     }
   }
 
-  public LocalTable(LocalDb db, Table msTbl, ColumnMap cols) {
+  public LocalTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cols) {
     this.db_ = Preconditions.checkNotNull(db);
     this.name_ = msTbl.getTableName();
     this.cols_ = cols;
-
+    this.ref_ = ref;
     this.msTable_ = msTbl;
 
     tableStats_ = new TTableStats(
@@ -126,8 +144,8 @@ abstract class LocalTable implements FeTable {
         FeCatalogUtils.getTotalSize(msTable_.getParameters()));
   }
 
-  public LocalTable(LocalDb db, Table msTbl) {
-    this(db, msTbl, ColumnMap.fromMsTable(msTbl));
+  public LocalTable(LocalDb db, Table msTbl, TableMetaRef ref) {
+    this(db, msTbl, ref, ColumnMap.fromMsTable(msTbl));
   }
 
   @Override
@@ -232,7 +250,7 @@ abstract class LocalTable implements FeTable {
   protected void loadColumnStats() {
     try {
       List<ColumnStatisticsObj> stats = db_.getCatalog().getMetaProvider()
-          .loadTableColumnStatistics(db_.getName(), getName(), getColumnNames());
+          .loadTableColumnStatistics(ref_, getColumnNames());
       FeCatalogUtils.injectColumnStats(stats, this);
     } catch (TException e) {
       LOG.warn("Could not load column statistics for: " + getFullName(), e);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
index 1aecdd2..d5b0796 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
@@ -25,6 +25,7 @@ import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.View;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableDescriptor;
 
@@ -37,8 +38,8 @@ import org.apache.impala.thrift.TTableDescriptor;
 public class LocalView extends LocalTable implements FeView {
   private final QueryStmt queryStmt_;
 
-  public LocalView(LocalDb db, Table msTbl) {
-    super(db, msTbl);
+  public LocalView(LocalDb db, Table msTbl, TableMetaRef ref) {
+    super(db, msTbl, ref);
 
     try {
       queryStmt_ = View.parseViewDef(this);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 75d389e..0a217da 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -17,12 +17,9 @@
 
 package org.apache.impala.catalog.local;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -31,9 +28,14 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
 
 import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.Immutable;
 
 /**
  * Interface for loading metadata. See {@link LocalCatalog} for an example.
@@ -52,13 +54,13 @@ interface MetaProvider {
   ImmutableList<String> loadTableNames(String dbName)
       throws MetaException, UnknownDBException, TException;
 
-  Table loadTable(String dbName, String tableName)
+  Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
       throws NoSuchObjectException, MetaException, TException;
 
   String loadNullPartitionKeyValue()
       throws MetaException, TException;
 
-  List<String> loadPartitionNames(String dbName, String tableName)
+  List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException;
 
   /**
@@ -77,19 +79,40 @@ interface MetaProvider {
    * If a requested partition does not exist, no exception will be thrown.
    * Instead, the resulting map will contain no entry for that partition.
    */
-  Map<String, Partition> loadPartitionsByNames(String dbName, String tableName,
-      List<String> partitionColumnNames, List<String> partitionNames)
+  Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
+      List<String> partitionColumnNames, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs)
       throws MetaException, TException;
 
   /**
    * Load statistics for the given columns from the given table.
    */
-  List<ColumnStatisticsObj> loadTableColumnStatistics(String dbName,
-      String tblName, List<String> colNames) throws TException;
+  List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+      List<String> colNames) throws TException;
 
   /**
-   * Load file metadata and block locations for the files in the given
-   * partition directory.
+   * Reference to a table as returned by loadTable(). This reference must be passed
+   * back to other functions to fetch more details about the table. Implementations
+   * may use this reference to store internal information such as version numbers
+   * in order to perform concurrency control checks, etc.
    */
-  List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException;
+  interface TableMetaRef {
+  }
+
+  /**
+   * Reference to a partition as returned from loadPartitionList(). These references
+   * may be passed back into loadPartitionsByRefs() to load detailed partition metadata.
+   */
+  @Immutable
+  interface PartitionRef {
+    String getName();
+  }
+
+  /**
+   * Partition metadata as returned by loadPartitionsByRefs().
+   */
+  interface PartitionMetadata {
+    Partition getHmsPartition();
+    ImmutableList<FileDescriptor> getFileDescriptors();
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index 3c33bf1..6299dd4 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -43,6 +43,7 @@ public class RuntimeEnv {
    */
   public void reset() {
     numCores_ = Runtime.getRuntime().availableProcessors();
+    isTestEnv_ = false;
   }
 
   public int getNumCores() { return numCores_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cb33556..8c10e5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,7 +49,6 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
-import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 20d47bf..ad3add6 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -58,6 +58,7 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -901,8 +902,10 @@ public class CatalogOpExecutor {
       Column tableCol = table.getColumn(entry.getKey());
       // Ignore columns that were dropped in the meantime.
       if (tableCol == null) continue;
-      ColumnStatisticsData colStatsData =
-          createHiveColStatsData(params, entry.getValue(), tableCol.getType());
+      // If we know the number of rows in the table, cap NDV of the column appropriately.
+      long ndvCap = params.isSetTable_stats() ? params.table_stats.num_rows : -1;
+      ColumnStatisticsData colStatsData = ColumnStats.createHiveColStatsData(
+              ndvCap, entry.getValue(), tableCol.getType());
       if (colStatsData == null) continue;
       if (LOG.isTraceEnabled()) {
         LOG.trace(String.format("Updating column stats for %s: numDVs=%s numNulls=%s " +
@@ -917,57 +920,6 @@ public class CatalogOpExecutor {
     return colStats;
   }
 
-  private static ColumnStatisticsData createHiveColStatsData(
-      TAlterTableUpdateStatsParams params, TColumnStats colStats, Type colType) {
-    ColumnStatisticsData colStatsData = new ColumnStatisticsData();
-    long ndv = colStats.getNum_distinct_values();
-    // Cap NDV at row count if available.
-    if (params.isSetTable_stats()) ndv = Math.min(ndv, params.table_stats.num_rows);
-
-    long numNulls = colStats.getNum_nulls();
-    switch(colType.getPrimitiveType()) {
-      case BOOLEAN:
-        colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls));
-        break;
-      case TINYINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case SMALLINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case INT:
-        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case BIGINT:
-      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case FLOAT:
-      case DOUBLE:
-        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        long maxStrLen = colStats.getMax_size();
-        double avgStrLen = colStats.getAvg_size();
-        colStatsData.setStringStats(
-            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
-        break;
-      case DECIMAL:
-        double decMaxNdv = Math.pow(10, colType.getPrecision());
-        ndv = (long) Math.min(ndv, decMaxNdv);
-        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, ndv));
-        break;
-      default:
-        return null;
-    }
-    return colStatsData;
-  }
-
   /**
    * Creates a new database in the metastore and adds the db name to the internal
    * metadata cache, marking its metadata to be lazily loaded on the next access.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 052d496..d64a554 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -103,6 +103,9 @@ public class FeSupport {
   // using Java Thrift bindings.
   public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
 
+  public native static byte[] NativeGetPartialCatalogObject(byte[] thriftReq)
+      throws InternalException;
+
   // Parses a string of comma-separated key=value query options ('csvQueryOptions'),
   // updates the existing query options ('queryOptions') with them and returns the
   // resulting serialized TQueryOptions object.
@@ -349,6 +352,17 @@ public class FeSupport {
     return MinLogSpaceForBloomFilter(ndv, fpp);
   }
 
+  public static byte[] GetPartialCatalogObject(byte[] thriftReq)
+      throws InternalException {
+    try {
+      return NativeGetPartialCatalogObject(thriftReq);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return NativeGetPartialCatalogObject(thriftReq);
+  }
+
+
   /**
    * This function should be called explicitly by the FeSupport to ensure that
    * native functions are loaded. Tests that depend on JniCatalog or JniFrontend

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fcb5ce2..e259631 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -87,6 +87,7 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -1019,6 +1020,9 @@ public class Frontend {
    */
   public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
       throws ImpalaException {
+    // TODO(todd): wrap the planning in a retry loop which catches
+    // InconsistentMetadataFetchException.
+
     // Timeline of important events in the planning process, used for debugging
     // and profiling.
     EventSequence timeline = new EventSequence("Query Compilation");

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index daa57ab..955f48f 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetTablesParams;
 import org.apache.impala.thrift.TGetTableMetricsParams;
 import org.apache.impala.thrift.TGetTablesResult;
@@ -216,6 +217,15 @@ public class JniCatalog {
     return serializer.serialize(catalog_.getTCatalogObject(objectDescription));
   }
 
+  public byte[] getPartialCatalogObject(byte[] thriftParams) throws ImpalaException,
+      TException {
+    TGetPartialCatalogObjectRequest req =
+        new TGetPartialCatalogObjectRequest();
+    JniUtil.deserializeThrift(protocolFactory_, req, thriftParams);
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(catalog_.getPartialCatalogObject(req));
+  }
+
   /**
    * See comment in CatalogServiceCatalog.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
index 100a513..2e093c1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
@@ -17,20 +17,38 @@
 
 package org.apache.impala.catalog;
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.impala.catalog.HdfsPartition.comparePartitionKeyValues;
+import static org.junit.Assert.*;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.List;
-import java.lang.*;
 
-import org.apache.impala.analysis.*;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.impala.analysis.BoolLiteral;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NullLiteral;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.StringLiteral;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable.FileMetadataLoadStats;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.junit.Test;
 
-import static org.apache.impala.catalog.HdfsPartition.comparePartitionKeyValues;
+import com.google.common.collect.Lists;
 
 public class HdfsPartitionTest {
 
+  static {
+    FeSupport.loadLibrary();
+  }
+
   private List<LiteralExpr> valuesNull_= Lists.newArrayList();
   private List<LiteralExpr> valuesDecimal_ = Lists.newArrayList();
   private List<LiteralExpr> valuesDecimal1_ = Lists.newArrayList();
@@ -112,4 +130,45 @@ public class HdfsPartitionTest {
           Integer.signum(comparePartitionKeyValues(o2, o3)));
     }
   }
+
+  /**
+   * Get the list of all locations of blocks from the given file descriptor.
+   */
+  private static List<TNetworkAddress> getAllReplicaAddresses(FileDescriptor fd,
+      ListMap<TNetworkAddress> hostIndex) {
+    List<TNetworkAddress> ret = new ArrayList<>();
+    for (int i = 0; i < fd.getNumFileBlocks(); i++) {
+      for (int j = 0; j < fd.getFbFileBlock(i).replicaHostIdxsLength(); j++) {
+        int idx = fd.getFbFileBlock(i).replicaHostIdxs(j);
+        ret.add(hostIndex.getEntry(idx));
+      }
+    }
+    return ret;
+  }
+
+  @Test
+  public void testCloneWithNewHostIndex() throws Exception {
+    // Fetch some metadata from a directory in HDFS.
+    Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas");
+    FileSystem fs = p.getFileSystem(new Configuration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(p);
+    ListMap<TNetworkAddress> origIndex = new ListMap<>();
+    List<FileDescriptor> fileDescriptors = HdfsTable.createFileDescriptors(fs, iter,
+        origIndex, new FileMetadataLoadStats(p));
+    assertTrue(!fileDescriptors.isEmpty());
+
+    FileDescriptor fd = fileDescriptors.get(0);
+    // Get the list of locations, using the original host index.
+    List<TNetworkAddress> origAddresses = getAllReplicaAddresses(fd, origIndex);
+
+    // Make a new host index with the hosts in the opposite order.
+    ListMap<TNetworkAddress> newIndex = new ListMap<>();
+    newIndex.populate(Lists.reverse(origIndex.getList()));
+
+    // Clone the FD over to the reversed index. The actual addresses should be the same.
+    FileDescriptor cloned = fd.cloneWithNewHostIndex(origIndex.getList(), newIndex);
+    List<TNetworkAddress> newAddresses = getAllReplicaAddresses(cloned, newIndex);
+
+    assertEquals(origAddresses, newAddresses);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
new file mode 100644
index 0000000..2ff5015
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TCatalogInfoSelector;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class PartialCatalogInfoTest {
+  private static CatalogServiceCatalog catalog_ =
+      CatalogServiceTestCatalog.create();
+
+  private TGetPartialCatalogObjectResponse sendRequest(
+      TGetPartialCatalogObjectRequest req)
+      throws CatalogException, InternalException, TException {
+    System.err.println("req: " + req);
+    TGetPartialCatalogObjectResponse resp;
+    resp = catalog_.getPartialCatalogObject(req);
+    // Round-trip the response through serialization, so if we accidentally forgot to
+    // set the "isset" flag for any fields, we'll catch that bug.
+    byte[] respBytes = new TSerializer().serialize(resp);
+    resp.clear();
+    new TDeserializer().deserialize(resp, respBytes);
+    System.err.println("resp: " + resp);
+    return resp;
+  }
+
+  @Test
+  public void testDbList() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.CATALOG);
+    req.catalog_info_selector = new TCatalogInfoSelector();
+    req.catalog_info_selector.want_db_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.catalog_info.db_names.contains("functional"));
+  }
+
+  @Test
+  public void testDb() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.DATABASE);
+    req.object_desc.db = new TDatabase("functional");
+    req.db_info_selector = new TDbInfoSelector();
+    req.db_info_selector.want_hms_database = true;
+    req.db_info_selector.want_table_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.isSetObject_version_number());
+    assertEquals(resp.db_info.hms_database.getName(), "functional");
+    assertTrue(resp.db_info.table_names.contains("alltypes"));
+  }
+
+  @Test
+  public void testTable() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.isSetObject_version_number());
+    assertEquals(resp.table_info.hms_table.getTableName(), "alltypes");
+    assertTrue(resp.table_info.partitions.size() > 0);
+    TPartialPartitionInfo partInfo = resp.table_info.partitions.get(1);
+    assertTrue("bad part name: " + partInfo.name,
+        partInfo.name.matches("year=\\d+/month=\\d+"));
+
+    // Fetch again, but specify two specific partitions and ask for metadata.
+    req.table_info_selector.clear();
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.partition_ids = ImmutableList.of(
+        resp.table_info.partitions.get(1).id,
+        resp.table_info.partitions.get(3).id);
+    resp = sendRequest(req);
+    assertNull(resp.table_info.hms_table);
+    assertEquals(2, resp.table_info.partitions.size());
+    partInfo = resp.table_info.partitions.get(0);
+    assertNull(partInfo.name);
+    assertEquals(req.table_info_selector.partition_ids.get(0), (Long)partInfo.id);
+    assertTrue(partInfo.hms_partition.getSd().getLocation().startsWith(
+        "hdfs://localhost:20500/test-warehouse/alltypes/year="));
+    // TODO(todd): we should probably transfer a compressed descriptor instead
+    // and refactor the MetaProvider interface to expose those since there is
+    // a lot of redundant info in partition descriptors.
+    // TODO(todd): should also filter out the incremental stats.
+  }
+
+  @Test
+  public void testFetchMissingPartId() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.partition_ids = ImmutableList.of(-12345L); // non-existent
+    try {
+      sendRequest(req);
+      fail("did not throw exception for missing partition");
+    } catch (IllegalArgumentException iae) {
+      assertEquals("Partition id -12345 does not exist", iae.getMessage());
+    }
+  }
+
+  @Test
+  public void testTableStats() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_stats_for_column_names = ImmutableList.of(
+        "year", "month", "id", "bool_col", "tinyint_col", "smallint_col",
+        "int_col", "bigint_col", "float_col", "double_col", "date_string_col",
+        "string_col", "timestamp_col");
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    List<ColumnStatisticsObj> stats = resp.table_info.column_stats;
+    // We have 13 columns, but 2 are the clustering columns which don't have stats.
+    assertEquals(11, stats.size());
+    assertEquals("ColumnStatisticsObj(colName:id, colType:INT, " +
+        "statsData:<ColumnStatisticsData longStats:LongColumnStatsData(" +
+        "numNulls:-1, numDVs:7300)>)", stats.get(0).toString());
+  }
+
+  @Test
+  public void testFetchErrorTable() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "bad_serde");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    try {
+      sendRequest(req);
+      fail("expected exception");
+    } catch (TableLoadingException tle) {
+      assertEquals("Failed to load metadata for table: functional.bad_serde",
+          tle.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 4cb2b96..ac15fb2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -157,6 +157,24 @@ public class LocalCatalogTest {
       }
     }
     assertEquals(24, totalFds);
+    assertTrue(t.getHostIndex().size() > 0);
+  }
+
+
+  @Test
+  public void testLoadFileDescriptorsUnpartitioned() throws Exception {
+    FeFsTable t = (FeFsTable) catalog_.getTable("tpch",  "region");
+    int totalFds = 0;
+    for (FeFsPartition p: FeCatalogUtils.loadAllPartitions(t)) {
+      List<FileDescriptor> fds = p.getFileDescriptors();
+      totalFds += fds.size();
+      for (FileDescriptor fd : fds) {
+        assertTrue(fd.getFileLength() > 0);
+        assertEquals(fd.getNumFileBlocks(), 1);
+        assertEquals(3, fd.getFbFileBlock(0).diskIdsLength());
+      }
+    }
+    assertEquals(1, totalFds);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index cbb48ce..e13e51b 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -97,7 +97,7 @@ public class FrontendTestBase {
 
   @AfterClass
   public static void cleanUp() throws Exception {
-    RuntimeEnv.INSTANCE.setTestEnv(false);
+    RuntimeEnv.INSTANCE.reset();
   }
 
   // Adds a Udf: default.name(args) to the catalog.