You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2018/08/21 16:39:48 UTC

[2/4] impala git commit: IMPALA-7436: initial fetch-from-catalogd implementation

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index f19adfd..da784d6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -44,6 +44,9 @@ import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.thrift.THdfsPartition;
@@ -108,7 +111,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   private final String avroSchema_;
 
-  public static LocalFsTable load(LocalDb db, Table msTbl) {
+  public static LocalFsTable load(LocalDb db, Table msTbl, TableMetaRef ref) {
     String fullName = msTbl.getDbName() + "." + msTbl.getTableName();
 
     // Set Avro schema if necessary.
@@ -141,16 +144,16 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
         cmap = ColumnMap.fromMsTable(msTbl);
       }
 
-      return new LocalFsTable(db, msTbl, cmap, avroSchema);
+      return new LocalFsTable(db, msTbl, ref, cmap, avroSchema);
     } catch (AnalysisException e) {
       throw new LocalCatalogException("Failed to load Avro schema for table "
           + fullName);
     }
   }
 
-  private LocalFsTable(LocalDb db, Table msTbl, ColumnMap cmap,
+  private LocalFsTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cmap,
       String explicitAvroSchema) {
-    super(db, msTbl, cmap);
+    super(db, msTbl, ref, cmap);
 
     // set NULL indicator string from table properties
     String tableNullFormat =
@@ -178,11 +181,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
    */
   public static LocalFsTable createCtasTarget(LocalDb db,
       Table msTbl) throws CatalogException {
-    // TODO(todd): set a member variable indicating this is a CTAS target
-    // so we can checkState() against it in various other methods and make
-    // sure we don't try to do something like load partitions for a not-yet-created
-    // table.
-    return new LocalFsTable(db, msTbl, ColumnMap.fromMsTable(msTbl),
+    return new LocalFsTable(db, msTbl, /*ref=*/null, ColumnMap.fromMsTable(msTbl),
         /*explicitAvroSchema=*/null);
   }
 
@@ -240,7 +239,13 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   public Set<HdfsFileFormat> getFileFormats() {
     // TODO(todd): can we avoid loading all partitions here? this is called
     // for any INSERT query, even if the partition is specified.
-    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
+    Collection<? extends FeFsPartition> parts;
+    if (ref_ != null) {
+      parts = FeCatalogUtils.loadAllPartitions(this);
+    } else {
+      // If this is a CTAS target, we don't want to try to load the partition list.
+      parts = Collections.emptyList();
+    }
     // In the case that we have no partitions added to the table yet, it's
     // important to add the "prototype" partition as a fallback.
     Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
@@ -329,9 +334,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
     protoMsPartition.setParameters(Collections.<String, String>emptyMap());
     LocalPartitionSpec spec = new LocalPartitionSpec(
-        this, "", CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
+        this, CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
     LocalFsPartition prototypePartition = new LocalFsPartition(
-        this, spec, protoMsPartition);
+        this, spec, protoMsPartition, /*fileDescriptors=*/null);
     return prototypePartition;
   }
 
@@ -374,29 +379,17 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     // Possible in the case that all partitions were pruned.
     if (ids.isEmpty()) return Collections.emptyList();
 
-    List<String> names = Lists.newArrayList();
+    List<PartitionRef> refs = Lists.newArrayList();
     for (Long id : ids) {
       LocalPartitionSpec spec = partitionSpecs_.get(id);
       Preconditions.checkArgument(spec != null, "Invalid partition ID for table %s: %s",
           getFullName(), id);
-      String name = spec.getName();
-      if (name.isEmpty()) {
-        // Unpartitioned tables don't need to fetch partitions from the metadata
-        // provider. Rather, we just create a partition on the fly.
-        Preconditions.checkState(getNumClusteringCols() == 0,
-            "Cannot fetch empty partition name from a partitioned table");
-        Preconditions.checkArgument(ids.size() == 1,
-            "Expected to only fetch one partition for unpartitioned table %s",
-            getFullName());
-        return Lists.newArrayList(createUnpartitionedPartition(spec));
-      } else {
-        names.add(name);
-      }
+      refs.add(Preconditions.checkNotNull(spec.getRef()));
     }
-    Map<String, Partition> partsByName;
+    Map<String, PartitionMetadata> partsByName;
     try {
-      partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByNames(
-          db_.getName(), name_, getClusteringColumnNames(), names);
+      partsByName = db_.getCatalog().getMetaProvider().loadPartitionsByRefs(
+          ref_, getClusteringColumnNames(), hostIndex_, refs);
     } catch (TException e) {
       throw new LocalCatalogException(
           "Could not load partitions for table " + getFullName(), e);
@@ -404,16 +397,19 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     List<FeFsPartition> ret = Lists.newArrayListWithCapacity(ids.size());
     for (Long id : ids) {
       LocalPartitionSpec spec = partitionSpecs_.get(id);
-      Partition p = partsByName.get(spec.getName());
+      PartitionMetadata p = partsByName.get(spec.getRef().getName());
       if (p == null) {
         // TODO(todd): concurrent drop partition could result in this error.
         // Should we recover in a more graceful way from such an unexpected event?
         throw new LocalCatalogException(
             "Could not load expected partitions for table " + getFullName() +
-            ": missing expected partition with name '" + spec.getName() +
+            ": missing expected partition with name '" + spec.getRef().getName() +
             "' (perhaps it was concurrently dropped by another process)");
       }
-      ret.add(new LocalFsPartition(this, spec, p));
+
+      LocalFsPartition part = new LocalFsPartition(this, spec, p.getHmsPartition(),
+          p.getFileDescriptors());
+      ret.add(part);
     }
     return ret;
   }
@@ -426,23 +422,6 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     return names;
   }
 
-  /**
-   * Create a partition which represents the main partition of an unpartitioned
-   * table.
-   */
-  private LocalFsPartition createUnpartitionedPartition(LocalPartitionSpec spec) {
-    Preconditions.checkArgument(spec.getName().isEmpty());
-    Partition msp = new Partition();
-    msp.setSd(getMetaStoreTable().getSd());
-    msp.setParameters(getMetaStoreTable().getParameters());
-    msp.setValues(Collections.<String>emptyList());
-    return new LocalFsPartition(this, spec, msp);
-  }
-
-  private LocalPartitionSpec createUnpartitionedPartitionSpec() {
-    return new LocalPartitionSpec(this, "", /*id=*/0);
-  }
-
   private void loadPartitionValueMap() {
     if (partitionValueMap_ != null) return;
 
@@ -478,28 +457,23 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
   private void loadPartitionSpecs() {
     if (partitionSpecs_ != null) return;
-
-    if (getNumClusteringCols() == 0) {
-      // Unpartitioned table.
-      // This table has no partition key, which means it has no declared partitions.
-      // We model partitions slightly differently to Hive - every file must exist in a
-      // partition, so add a single partition with no keys which will get all the
-      // files in the table's root directory.
-      partitionSpecs_ = ImmutableMap.of(0L, createUnpartitionedPartitionSpec());
+    if (ref_ == null) {
+      // This is a CTAS target. Don't try to load metadata.
+      partitionSpecs_ = ImmutableMap.of();
       return;
     }
-    List<String> partNames;
+
+    List<PartitionRef> partList;
     try {
-      partNames = db_.getCatalog().getMetaProvider().loadPartitionNames(
-          db_.getName(), name_);
+      partList = db_.getCatalog().getMetaProvider().loadPartitionList(ref_);
     } catch (TException e) {
       throw new LocalCatalogException("Could not load partition names for table " +
           getFullName(), e);
     }
     ImmutableMap.Builder<Long, LocalPartitionSpec> b = new ImmutableMap.Builder<>();
     long id = 0;
-    for (String partName : partNames) {
-      b.put(id, new LocalPartitionSpec(this, partName, id));
+    for (PartitionRef part: partList) {
+      b.put(id, new LocalPartitionSpec(this, part, id));
       id++;
     }
     partitionSpecs_ = b.build();

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
index 4ad2c14..8480500 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalHbaseTable.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeHBaseTable;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TTableDescriptor;
@@ -44,19 +45,20 @@ public class LocalHbaseTable extends LocalTable implements FeHBaseTable {
   // TODO: revisit after caching is implemented for local catalog
   private HColumnDescriptor[] columnFamilies_ = null;
 
-  private LocalHbaseTable(LocalDb db, Table msTbl, ColumnMap cols) {
-    super(db, msTbl, cols);
+  private LocalHbaseTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cols) {
+    super(db, msTbl, ref, cols);
     hbaseTableName_ = Util.getHBaseTableName(msTbl);
   }
 
-  static LocalHbaseTable loadFromHbase(LocalDb db, Table msTable) {
+  static LocalHbaseTable loadFromHbase(LocalDb db, Table msTable, TableMetaRef ref) {
     try {
       // Warm up the connection and verify the table exists.
       Util.getHBaseTable(Util.getHBaseTableName(msTable)).close();
       // since we don't support composite hbase rowkeys yet, all hbase tables have a
       // single clustering col
-      return new LocalHbaseTable(db, msTable, new ColumnMap(Util.loadColumns(msTable), 1,
-          msTable.getDbName() + "." + msTable.getTableName()));
+      ColumnMap cmap = new ColumnMap(Util.loadColumns(msTable), 1,
+          msTable.getDbName() + "." + msTable.getTableName());
+      return new LocalHbaseTable(db, msTable, ref, cmap);
     } catch (IOException | MetaException | SerDeException e) {
       throw new LocalCatalogException(e);
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
index 2095449..fc48ca1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalKuduTable.java
@@ -31,6 +31,7 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TKuduTable;
 import org.apache.impala.thrift.TTableDescriptor;
@@ -55,7 +56,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
    * Create a new instance based on the table metadata 'msTable' stored
    * in the metastore.
    */
-  static LocalTable loadFromKudu(LocalDb db, Table msTable) {
+  static LocalTable loadFromKudu(LocalDb db, Table msTable, TableMetaRef ref) {
     Preconditions.checkNotNull(db);
     Preconditions.checkNotNull(msTable);
     String fullTableName = msTable.getDbName() + "." + msTable.getTableName();
@@ -82,7 +83,7 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     List<KuduPartitionParam> partitionBy = Utils.loadPartitionByParams(kuduTable);
 
     ColumnMap cmap = new ColumnMap(cols, /*numClusteringCols=*/0, fullTableName);
-    return new LocalKuduTable(db, msTable, cmap, pkNames, partitionBy);
+    return new LocalKuduTable(db, msTable, ref, cmap, pkNames, partitionBy);
   }
 
 
@@ -104,7 +105,9 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     }
 
     ColumnMap cmap = new ColumnMap(columns, /*numClusteringCols=*/0, fullTableName);
-    return new LocalKuduTable(db, msTable, cmap, pkNames, kuduPartitionParams);
+
+    return new LocalKuduTable(db, msTable, /*ref=*/null, cmap, pkNames,
+        kuduPartitionParams);
   }
 
   private static void convertColsFromKudu(Schema schema, List<Column> cols,
@@ -128,10 +131,10 @@ public class LocalKuduTable extends LocalTable implements FeKuduTable {
     }
   }
 
-  private LocalKuduTable(LocalDb db, Table msTable, ColumnMap cmap,
+  private LocalKuduTable(LocalDb db, Table msTable, TableMetaRef ref, ColumnMap cmap,
       List<String> primaryKeyColumnNames,
       List<KuduPartitionParam> partitionBy)  {
-    super(db, msTable, cmap);
+    super(db, msTable, ref, cmap);
     tableParams_ = new TableParams(msTable);
     partitionBy_ = ImmutableList.copyOf(partitionBy);
     primaryKeyColumnNames_ = ImmutableList.copyOf(primaryKeyColumnNames);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
index 690a4bf..c635158 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalPartitionSpec.java
@@ -19,11 +19,15 @@ package org.apache.impala.catalog.local;
 
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
+import org.apache.impala.thrift.CatalogObjectsConstants;
 import org.apache.impala.util.MetaStoreUtil;
 
 import com.google.common.base.Preconditions;
@@ -36,31 +40,43 @@ import com.google.errorprone.annotations.Immutable;
  */
 @Immutable
 class LocalPartitionSpec implements PrunablePartition {
+  static final long UNPARTITIONED_ID = 0;
   private final long id_;
-  private final String name_;
+
+  @Nullable
+  private final PartitionRef ref_;
 
   // LiteralExprs are technically mutable prior to analysis.
   @SuppressWarnings("Immutable")
   private final ImmutableList<LiteralExpr> partitionValues_;
 
-  LocalPartitionSpec(LocalFsTable table, String partName, long id) {
+  LocalPartitionSpec(LocalFsTable table, PartitionRef ref, long id) {
     id_ = id;
-    name_ = Preconditions.checkNotNull(partName);
-    if (!partName.isEmpty()) {
-      try {
-        List<String> partValues = MetaStoreUtil.getPartValsFromName(
-            table.getMetaStoreTable(), partName);
-        partitionValues_ = ImmutableList.copyOf(FeCatalogUtils.parsePartitionKeyValues(
-            table, partValues));
-      } catch (CatalogException | MetaException e) {
-        throw new LocalCatalogException(String.format(
-            "Failed to parse partition name '%s' for table %s",
-            partName, table.getFullName()), e);
-      }
-    } else {
-      // Unpartitioned tables have a single partition with empty name.
-      partitionValues_= ImmutableList.of();
+    ref_ = Preconditions.checkNotNull(ref);
+    if (ref.getName().isEmpty()) {
+      // "unpartitioned" partition
+      partitionValues_ = ImmutableList.of();
+      return;
     }
+    try {
+      List<String> partValues = MetaStoreUtil.getPartValsFromName(
+          table.getMetaStoreTable(), ref_.getName());
+      partitionValues_ = ImmutableList.copyOf(FeCatalogUtils.parsePartitionKeyValues(
+          table, partValues));
+    } catch (CatalogException | MetaException e) {
+      throw new LocalCatalogException(String.format(
+          "Failed to parse partition name '%s' for table %s",
+          ref.getName(), table.getFullName()), e);
+    }
+  }
+
+  LocalPartitionSpec(LocalFsTable table, long id) {
+    // Unpartitioned tables have a single partition with empty name.
+    Preconditions.checkArgument(id == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID ||
+        id == UNPARTITIONED_ID);
+    this.id_ = id;
+    this.ref_ = null;
+    partitionValues_= ImmutableList.of();
   }
 
   @Override
@@ -69,5 +85,17 @@ class LocalPartitionSpec implements PrunablePartition {
   @Override
   public List<LiteralExpr> getPartitionValues() { return partitionValues_; }
 
-  String getName() { return name_; }
+  PartitionRef getRef() { return ref_; }
+
+  @Override
+  public String toString() {
+    if (ref_ != null) {
+      return ref_.getName();
+    } else if (id_ == CatalogObjectsConstants.PROTOTYPE_PARTITION_ID) {
+      return "<prototype>";
+    } else {
+      return "<default>";
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 1a14831..81a0741 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -20,6 +20,8 @@ package org.apache.impala.catalog.local;
 import java.util.ArrayList;
 import java.util.List;
 
+import javax.annotation.Nullable;
+
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -36,6 +38,8 @@ import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
+import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.log4j.Logger;
@@ -66,22 +70,36 @@ abstract class LocalTable implements FeTable {
 
   private final TTableStats tableStats_;
 
+  /**
+   * Table reference as provided by the initial call to the metadata provider.
+   * This must be passed back to any further calls to the metadata provider
+   * in order to verify consistency.
+   *
+   * In the case of CTAS target tables, this may be null. Since the tables don't
+   * exist yet in any metadata storage, it would be invalid to try to load any metadata
+   * about them.
+   */
+  @Nullable
+  protected final TableMetaRef ref_;
+
   public static LocalTable load(LocalDb db, String tblName) {
     // In order to know which kind of table subclass to instantiate, we need
     // to eagerly grab and parse the top-level Table object from the HMS.
     LocalTable t = null;
-    Table msTbl = loadMsTable(db, tblName);
+    Pair<Table, TableMetaRef> tableMeta = loadTableMetadata(db, tblName);
+    Table msTbl = tableMeta.first;
+    TableMetaRef ref = tableMeta.second;
     if (TableType.valueOf(msTbl.getTableType()) == TableType.VIRTUAL_VIEW) {
-      t = new LocalView(db, msTbl);
+      t = new LocalView(db, msTbl, ref);
     } else if (HBaseTable.isHBaseTable(msTbl)) {
-      t = LocalHbaseTable.loadFromHbase(db, msTbl);
+      t = LocalHbaseTable.loadFromHbase(db, msTbl, ref);
     } else if (KuduTable.isKuduTable(msTbl)) {
-      t = LocalKuduTable.loadFromKudu(db, msTbl);
+      t = LocalKuduTable.loadFromKudu(db, msTbl, ref);
     } else if (DataSourceTable.isDataSourceTable(msTbl)) {
       // TODO(todd) support datasource table
     } else if (HdfsFileFormat.isHdfsInputFormatClass(
         msTbl.getSd().getInputFormat())) {
-      t = LocalFsTable.load(db, msTbl);
+      t = LocalFsTable.load(db, msTbl, ref);
     }
 
     if (t == null) {
@@ -101,7 +119,7 @@ abstract class LocalTable implements FeTable {
   /**
    * Load the Table instance from the metastore.
    */
-  private static Table loadMsTable(LocalDb db, String tblName) {
+  private static Pair<Table, TableMetaRef> loadTableMetadata(LocalDb db, String tblName) {
     Preconditions.checkArgument(tblName.toLowerCase().equals(tblName));
 
     try {
@@ -113,11 +131,11 @@ abstract class LocalTable implements FeTable {
     }
   }
 
-  public LocalTable(LocalDb db, Table msTbl, ColumnMap cols) {
+  public LocalTable(LocalDb db, Table msTbl, TableMetaRef ref, ColumnMap cols) {
     this.db_ = Preconditions.checkNotNull(db);
     this.name_ = msTbl.getTableName();
     this.cols_ = cols;
-
+    this.ref_ = ref;
     this.msTable_ = msTbl;
 
     tableStats_ = new TTableStats(
@@ -126,8 +144,8 @@ abstract class LocalTable implements FeTable {
         FeCatalogUtils.getTotalSize(msTable_.getParameters()));
   }
 
-  public LocalTable(LocalDb db, Table msTbl) {
-    this(db, msTbl, ColumnMap.fromMsTable(msTbl));
+  public LocalTable(LocalDb db, Table msTbl, TableMetaRef ref) {
+    this(db, msTbl, ref, ColumnMap.fromMsTable(msTbl));
   }
 
   @Override
@@ -232,7 +250,7 @@ abstract class LocalTable implements FeTable {
   protected void loadColumnStats() {
     try {
       List<ColumnStatisticsObj> stats = db_.getCatalog().getMetaProvider()
-          .loadTableColumnStatistics(db_.getName(), getName(), getColumnNames());
+          .loadTableColumnStatistics(ref_, getColumnNames());
       FeCatalogUtils.injectColumnStats(stats, this);
     } catch (TException e) {
       LOG.warn("Could not load column statistics for: " + getFullName(), e);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
index 1aecdd2..d5b0796 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalView.java
@@ -25,6 +25,7 @@ import org.apache.impala.analysis.QueryStmt;
 import org.apache.impala.catalog.FeView;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.View;
+import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TTableDescriptor;
 
@@ -37,8 +38,8 @@ import org.apache.impala.thrift.TTableDescriptor;
 public class LocalView extends LocalTable implements FeView {
   private final QueryStmt queryStmt_;
 
-  public LocalView(LocalDb db, Table msTbl) {
-    super(db, msTbl);
+  public LocalView(LocalDb db, Table msTbl, TableMetaRef ref) {
+    super(db, msTbl, ref);
 
     try {
       queryStmt_ = View.parseViewDef(this);

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 75d389e..0a217da 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -17,12 +17,9 @@
 
 package org.apache.impala.catalog.local;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -31,9 +28,14 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
 
 import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.Immutable;
 
 /**
  * Interface for loading metadata. See {@link LocalCatalog} for an example.
@@ -52,13 +54,13 @@ interface MetaProvider {
   ImmutableList<String> loadTableNames(String dbName)
       throws MetaException, UnknownDBException, TException;
 
-  Table loadTable(String dbName, String tableName)
+  Pair<Table, TableMetaRef> loadTable(String dbName, String tableName)
       throws NoSuchObjectException, MetaException, TException;
 
   String loadNullPartitionKeyValue()
       throws MetaException, TException;
 
-  List<String> loadPartitionNames(String dbName, String tableName)
+  List<PartitionRef> loadPartitionList(TableMetaRef table)
       throws MetaException, TException;
 
   /**
@@ -77,19 +79,40 @@ interface MetaProvider {
    * If a requested partition does not exist, no exception will be thrown.
    * Instead, the resulting map will contain no entry for that partition.
    */
-  Map<String, Partition> loadPartitionsByNames(String dbName, String tableName,
-      List<String> partitionColumnNames, List<String> partitionNames)
+  Map<String, PartitionMetadata> loadPartitionsByRefs(TableMetaRef table,
+      List<String> partitionColumnNames, ListMap<TNetworkAddress> hostIndex,
+      List<PartitionRef> partitionRefs)
       throws MetaException, TException;
 
   /**
    * Load statistics for the given columns from the given table.
    */
-  List<ColumnStatisticsObj> loadTableColumnStatistics(String dbName,
-      String tblName, List<String> colNames) throws TException;
+  List<ColumnStatisticsObj> loadTableColumnStatistics(TableMetaRef table,
+      List<String> colNames) throws TException;
 
   /**
-   * Load file metadata and block locations for the files in the given
-   * partition directory.
+   * Reference to a table as returned by loadTable(). This reference must be passed
+   * back to other functions to fetch more details about the table. Implementations
+   * may use this reference to store internal information such as version numbers
+   * in order to perform concurrency control checks, etc.
    */
-  List<LocatedFileStatus> loadFileMetadata(Path dir) throws IOException;
+  interface TableMetaRef {
+  }
+
+  /**
+   * Reference to a partition as returned from loadPartitionList(). These references
+   * may be passed back into loadPartitionsByRefs() to load detailed partition metadata.
+   */
+  @Immutable
+  interface PartitionRef {
+    String getName();
+  }
+
+  /**
+   * Partition metadata as returned by loadPartitionsByRefs().
+   */
+  interface PartitionMetadata {
+    Partition getHmsPartition();
+    ImmutableList<FileDescriptor> getFileDescriptors();
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
index 3c33bf1..6299dd4 100644
--- a/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
+++ b/fe/src/main/java/org/apache/impala/common/RuntimeEnv.java
@@ -43,6 +43,7 @@ public class RuntimeEnv {
    */
   public void reset() {
     numCores_ = Runtime.getRuntime().availableProcessors();
+    isTestEnv_ = false;
   }
 
   public int getNumCores() { return numCores_; }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index cb33556..8c10e5f 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,7 +49,6 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
-import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 20d47bf..ad3add6 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -58,6 +58,7 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
+import org.apache.impala.catalog.ColumnStats;
 import org.apache.impala.catalog.DataSource;
 import org.apache.impala.catalog.Db;
 import org.apache.impala.catalog.FeCatalogUtils;
@@ -901,8 +902,10 @@ public class CatalogOpExecutor {
       Column tableCol = table.getColumn(entry.getKey());
       // Ignore columns that were dropped in the meantime.
       if (tableCol == null) continue;
-      ColumnStatisticsData colStatsData =
-          createHiveColStatsData(params, entry.getValue(), tableCol.getType());
+      // If we know the number of rows in the table, cap NDV of the column appropriately.
+      long ndvCap = params.isSetTable_stats() ? params.table_stats.num_rows : -1;
+      ColumnStatisticsData colStatsData = ColumnStats.createHiveColStatsData(
+              ndvCap, entry.getValue(), tableCol.getType());
       if (colStatsData == null) continue;
       if (LOG.isTraceEnabled()) {
         LOG.trace(String.format("Updating column stats for %s: numDVs=%s numNulls=%s " +
@@ -917,57 +920,6 @@ public class CatalogOpExecutor {
     return colStats;
   }
 
-  private static ColumnStatisticsData createHiveColStatsData(
-      TAlterTableUpdateStatsParams params, TColumnStats colStats, Type colType) {
-    ColumnStatisticsData colStatsData = new ColumnStatisticsData();
-    long ndv = colStats.getNum_distinct_values();
-    // Cap NDV at row count if available.
-    if (params.isSetTable_stats()) ndv = Math.min(ndv, params.table_stats.num_rows);
-
-    long numNulls = colStats.getNum_nulls();
-    switch(colType.getPrimitiveType()) {
-      case BOOLEAN:
-        colStatsData.setBooleanStats(new BooleanColumnStatsData(1, -1, numNulls));
-        break;
-      case TINYINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Byte.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case SMALLINT:
-        ndv = Math.min(ndv, LongMath.pow(2, Short.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case INT:
-        ndv = Math.min(ndv, LongMath.pow(2, Integer.SIZE));
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case BIGINT:
-      case TIMESTAMP: // Hive and Impala use LongColumnStatsData for timestamps.
-        colStatsData.setLongStats(new LongColumnStatsData(numNulls, ndv));
-        break;
-      case FLOAT:
-      case DOUBLE:
-        colStatsData.setDoubleStats(new DoubleColumnStatsData(numNulls, ndv));
-        break;
-      case CHAR:
-      case VARCHAR:
-      case STRING:
-        long maxStrLen = colStats.getMax_size();
-        double avgStrLen = colStats.getAvg_size();
-        colStatsData.setStringStats(
-            new StringColumnStatsData(maxStrLen, avgStrLen, numNulls, ndv));
-        break;
-      case DECIMAL:
-        double decMaxNdv = Math.pow(10, colType.getPrecision());
-        ndv = (long) Math.min(ndv, decMaxNdv);
-        colStatsData.setDecimalStats(new DecimalColumnStatsData(numNulls, ndv));
-        break;
-      default:
-        return null;
-    }
-    return colStatsData;
-  }
-
   /**
    * Creates a new database in the metastore and adds the db name to the internal
    * metadata cache, marking its metadata to be lazily loaded on the next access.

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/FeSupport.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java
index 052d496..d64a554 100644
--- a/fe/src/main/java/org/apache/impala/service/FeSupport.java
+++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java
@@ -103,6 +103,9 @@ public class FeSupport {
   // using Java Thrift bindings.
   public native static byte[] NativePrioritizeLoad(byte[] thriftReq);
 
+  public native static byte[] NativeGetPartialCatalogObject(byte[] thriftReq)
+      throws InternalException;
+
   // Parses a string of comma-separated key=value query options ('csvQueryOptions'),
   // updates the existing query options ('queryOptions') with them and returns the
   // resulting serialized TQueryOptions object.
@@ -349,6 +352,17 @@ public class FeSupport {
     return MinLogSpaceForBloomFilter(ndv, fpp);
   }
 
+  public static byte[] GetPartialCatalogObject(byte[] thriftReq)
+      throws InternalException {
+    try {
+      return NativeGetPartialCatalogObject(thriftReq);
+    } catch (UnsatisfiedLinkError e) {
+      loadLibrary();
+    }
+    return NativeGetPartialCatalogObject(thriftReq);
+  }
+
+
   /**
    * This function should be called explicitly by the FeSupport to ensure that
    * native functions are loaded. Tests that depend on JniCatalog or JniFrontend

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fcb5ce2..e259631 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -87,6 +87,7 @@ import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.ImpaladCatalog;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.local.InconsistentMetadataFetchException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -1019,6 +1020,9 @@ public class Frontend {
    */
   public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
       throws ImpalaException {
+    // TODO(todd): wrap the planning in a retry loop which catches
+    // InconsistentMetadataFetchException.
+
     // Timeline of important events in the planning process, used for debugging
     // and profiling.
     EventSequence timeline = new EventSequence("Query Compilation");

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index daa57ab..955f48f 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetTablesParams;
 import org.apache.impala.thrift.TGetTableMetricsParams;
 import org.apache.impala.thrift.TGetTablesResult;
@@ -216,6 +217,15 @@ public class JniCatalog {
     return serializer.serialize(catalog_.getTCatalogObject(objectDescription));
   }
 
+  public byte[] getPartialCatalogObject(byte[] thriftParams) throws ImpalaException,
+      TException {
+    TGetPartialCatalogObjectRequest req =
+        new TGetPartialCatalogObjectRequest();
+    JniUtil.deserializeThrift(protocolFactory_, req, thriftParams);
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(catalog_.getPartialCatalogObject(req));
+  }
+
   /**
    * See comment in CatalogServiceCatalog.
    */

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
index 100a513..2e093c1 100644
--- a/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/HdfsPartitionTest.java
@@ -17,20 +17,38 @@
 
 package org.apache.impala.catalog;
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.impala.catalog.HdfsPartition.comparePartitionKeyValues;
+import static org.junit.Assert.*;
 
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.List;
-import java.lang.*;
 
-import org.apache.impala.analysis.*;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.impala.analysis.BoolLiteral;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.NullLiteral;
+import org.apache.impala.analysis.NumericLiteral;
+import org.apache.impala.analysis.StringLiteral;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable.FileMetadataLoadStats;
+import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
 import org.junit.Test;
 
-import static org.apache.impala.catalog.HdfsPartition.comparePartitionKeyValues;
+import com.google.common.collect.Lists;
 
 public class HdfsPartitionTest {
 
+  static {
+    FeSupport.loadLibrary();
+  }
+
   private List<LiteralExpr> valuesNull_= Lists.newArrayList();
   private List<LiteralExpr> valuesDecimal_ = Lists.newArrayList();
   private List<LiteralExpr> valuesDecimal1_ = Lists.newArrayList();
@@ -112,4 +130,45 @@ public class HdfsPartitionTest {
           Integer.signum(comparePartitionKeyValues(o2, o3)));
     }
   }
+
+  /**
+   * Get the list of all locations of blocks from the given file descriptor.
+   */
+  private static List<TNetworkAddress> getAllReplicaAddresses(FileDescriptor fd,
+      ListMap<TNetworkAddress> hostIndex) {
+    List<TNetworkAddress> ret = new ArrayList<>();
+    for (int i = 0; i < fd.getNumFileBlocks(); i++) {
+      for (int j = 0; j < fd.getFbFileBlock(i).replicaHostIdxsLength(); j++) {
+        int idx = fd.getFbFileBlock(i).replicaHostIdxs(j);
+        ret.add(hostIndex.getEntry(idx));
+      }
+    }
+    return ret;
+  }
+
+  @Test
+  public void testCloneWithNewHostIndex() throws Exception {
+    // Fetch some metadata from a directory in HDFS.
+    Path p = new Path("hdfs://localhost:20500/test-warehouse/schemas");
+    FileSystem fs = p.getFileSystem(new Configuration());
+    RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(p);
+    ListMap<TNetworkAddress> origIndex = new ListMap<>();
+    List<FileDescriptor> fileDescriptors = HdfsTable.createFileDescriptors(fs, iter,
+        origIndex, new FileMetadataLoadStats(p));
+    assertTrue(!fileDescriptors.isEmpty());
+
+    FileDescriptor fd = fileDescriptors.get(0);
+    // Get the list of locations, using the original host index.
+    List<TNetworkAddress> origAddresses = getAllReplicaAddresses(fd, origIndex);
+
+    // Make a new host index with the hosts in the opposite order.
+    ListMap<TNetworkAddress> newIndex = new ListMap<>();
+    newIndex.populate(Lists.reverse(origIndex.getList()));
+
+    // Clone the FD over to the reversed index. The actual addresses should be the same.
+    FileDescriptor cloned = fd.cloneWithNewHostIndex(origIndex.getList(), newIndex);
+    List<TNetworkAddress> newAddresses = getAllReplicaAddresses(cloned, newIndex);
+
+    assertEquals(origAddresses, newAddresses);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
new file mode 100644
index 0000000..2ff5015
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.impala.common.InternalException;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.thrift.TCatalogInfoSelector;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TDatabase;
+import org.apache.impala.thrift.TDbInfoSelector;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class PartialCatalogInfoTest {
+  private static CatalogServiceCatalog catalog_ =
+      CatalogServiceTestCatalog.create();
+
+  private TGetPartialCatalogObjectResponse sendRequest(
+      TGetPartialCatalogObjectRequest req)
+      throws CatalogException, InternalException, TException {
+    System.err.println("req: " + req);
+    TGetPartialCatalogObjectResponse resp;
+    resp = catalog_.getPartialCatalogObject(req);
+    // Round-trip the response through serialization, so if we accidentally forgot to
+    // set the "isset" flag for any fields, we'll catch that bug.
+    byte[] respBytes = new TSerializer().serialize(resp);
+    resp.clear();
+    new TDeserializer().deserialize(resp, respBytes);
+    System.err.println("resp: " + resp);
+    return resp;
+  }
+
+  @Test
+  public void testDbList() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.CATALOG);
+    req.catalog_info_selector = new TCatalogInfoSelector();
+    req.catalog_info_selector.want_db_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.catalog_info.db_names.contains("functional"));
+  }
+
+  @Test
+  public void testDb() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.DATABASE);
+    req.object_desc.db = new TDatabase("functional");
+    req.db_info_selector = new TDbInfoSelector();
+    req.db_info_selector.want_hms_database = true;
+    req.db_info_selector.want_table_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.isSetObject_version_number());
+    assertEquals(resp.db_info.hms_database.getName(), "functional");
+    assertTrue(resp.db_info.table_names.contains("alltypes"));
+  }
+
+  @Test
+  public void testTable() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    assertTrue(resp.isSetObject_version_number());
+    assertEquals(resp.table_info.hms_table.getTableName(), "alltypes");
+    assertTrue(resp.table_info.partitions.size() > 0);
+    TPartialPartitionInfo partInfo = resp.table_info.partitions.get(1);
+    assertTrue("bad part name: " + partInfo.name,
+        partInfo.name.matches("year=\\d+/month=\\d+"));
+
+    // Fetch again, but specify two specific partitions and ask for metadata.
+    req.table_info_selector.clear();
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.partition_ids = ImmutableList.of(
+        resp.table_info.partitions.get(1).id,
+        resp.table_info.partitions.get(3).id);
+    resp = sendRequest(req);
+    assertNull(resp.table_info.hms_table);
+    assertEquals(2, resp.table_info.partitions.size());
+    partInfo = resp.table_info.partitions.get(0);
+    assertNull(partInfo.name);
+    assertEquals(req.table_info_selector.partition_ids.get(0), (Long)partInfo.id);
+    assertTrue(partInfo.hms_partition.getSd().getLocation().startsWith(
+        "hdfs://localhost:20500/test-warehouse/alltypes/year="));
+    // TODO(todd): we should probably transfer a compressed descriptor instead
+    // and refactor the MetaProvider interface to expose those since there is
+    // a lot of redundant info in partition descriptors.
+    // TODO(todd): should also filter out the incremental stats.
+  }
+
+  @Test
+  public void testFetchMissingPartId() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.partition_ids = ImmutableList.of(-12345L); // non-existent
+    try {
+      sendRequest(req);
+      fail("did not throw exception for missing partition");
+    } catch (IllegalArgumentException iae) {
+      assertEquals("Partition id -12345 does not exist", iae.getMessage());
+    }
+  }
+
+  @Test
+  public void testTableStats() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_stats_for_column_names = ImmutableList.of(
+        "year", "month", "id", "bool_col", "tinyint_col", "smallint_col",
+        "int_col", "bigint_col", "float_col", "double_col", "date_string_col",
+        "string_col", "timestamp_col");
+    TGetPartialCatalogObjectResponse resp = sendRequest(req);
+    List<ColumnStatisticsObj> stats = resp.table_info.column_stats;
+    // We have 13 columns, but 2 are the clustering columns which don't have stats.
+    assertEquals(11, stats.size());
+    assertEquals("ColumnStatisticsObj(colName:id, colType:INT, " +
+        "statsData:<ColumnStatisticsData longStats:LongColumnStatsData(" +
+        "numNulls:-1, numDVs:7300)>)", stats.get(0).toString());
+  }
+
+  @Test
+  public void testFetchErrorTable() throws Exception {
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "bad_serde");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    try {
+      sendRequest(req);
+      fail("expected exception");
+    } catch (TableLoadingException tle) {
+      assertEquals("Failed to load metadata for table: functional.bad_serde",
+          tle.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
index 4cb2b96..ac15fb2 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/LocalCatalogTest.java
@@ -157,6 +157,24 @@ public class LocalCatalogTest {
       }
     }
     assertEquals(24, totalFds);
+    assertTrue(t.getHostIndex().size() > 0);
+  }
+
+
+  @Test
+  public void testLoadFileDescriptorsUnpartitioned() throws Exception {
+    FeFsTable t = (FeFsTable) catalog_.getTable("tpch",  "region");
+    int totalFds = 0;
+    for (FeFsPartition p: FeCatalogUtils.loadAllPartitions(t)) {
+      List<FileDescriptor> fds = p.getFileDescriptors();
+      totalFds += fds.size();
+      for (FileDescriptor fd : fds) {
+        assertTrue(fd.getFileLength() > 0);
+        assertEquals(fd.getNumFileBlocks(), 1);
+        assertEquals(3, fd.getFbFileBlock(0).diskIdsLength());
+      }
+    }
+    assertEquals(1, totalFds);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/impala/blob/ef15da08/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index cbb48ce..e13e51b 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -97,7 +97,7 @@ public class FrontendTestBase {
 
   @AfterClass
   public static void cleanUp() throws Exception {
-    RuntimeEnv.INSTANCE.setTestEnv(false);
+    RuntimeEnv.INSTANCE.reset();
   }
 
   // Adds a Udf: default.name(args) to the catalog.