You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/22 17:34:54 UTC

[impala] 11/13: IMPALA-7140 (part 4): support creating descriptors for FS tables

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e62b0ec49ca268c074f1c1e85c884b35e4dd7627
Author: Todd Lipcon <to...@cloudera.com>
AuthorDate: Wed Jun 13 15:39:20 2018 -0700

    IMPALA-7140 (part 4): support creating descriptors for FS tables
    
    This adds the relevant methods to convert LocalFsTable and
    LocalFsPartition to thrift descriptors for consumption by the backend.
    
    Unfortunately we cannot yet enable the planner tests, since they are
    checking file counts and sizes as part of the explain output, and we
    haven't yet implemented file info fetching in the LocalCatalog.
    
    However, I was able to manually test this change by starting an impalad
    with --use_local_catalog, connecting to it from the shell, and running
    various EXPLAIN SELECT queries against tpch and functional tables. The
    explain output is more or less as expected with the exception of missing
    file info.
    
    Change-Id: I4550612eb6d1e3a324f49a9c4d24b048e45d3738
    Reviewed-on: http://gerrit.cloudera.org:8080/10735
    Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/CatalogObjects.thrift                |  1 +
 .../org/apache/impala/catalog/FeCatalogUtils.java  | 82 ++++++++++++++++++++++
 .../org/apache/impala/catalog/FeFsPartition.java   | 16 +++++
 .../java/org/apache/impala/catalog/FeFsTable.java  |  8 +++
 .../org/apache/impala/catalog/HdfsPartition.java   | 73 +++++--------------
 .../java/org/apache/impala/catalog/HdfsTable.java  | 30 ++------
 .../main/java/org/apache/impala/catalog/Table.java |  6 +-
 .../impala/catalog/local/LocalFsPartition.java     | 25 ++++++-
 .../apache/impala/catalog/local/LocalFsTable.java  | 58 ++++++++++++++-
 .../apache/impala/catalog/local/LocalTable.java    | 13 ++--
 .../org/apache/impala/planner/HdfsScanNode.java    | 20 +++---
 11 files changed, 227 insertions(+), 105 deletions(-)

diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 23da91f..bc068d6 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -239,6 +239,7 @@ struct THdfsPartitionLocation {
 }
 
 // Represents an HDFS partition
+// TODO(vercegovac): rename to TFsPartition
 struct THdfsPartition {
   1: required byte lineDelim
   2: required byte fieldDelim
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 80ab246..bf4d230 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -30,6 +30,10 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
 import org.apache.impala.analysis.ToSqlUtils;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.TColumnDescriptor;
+import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.TTableStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +42,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Static utility functions shared between FeCatalog implementations.
@@ -114,6 +119,15 @@ public abstract class FeCatalogUtils {
     }
   }
 
+  // TODO(todd): move to a default method in FeTable in Java8
+  public static List<TColumnDescriptor> getTColumnDescriptors(FeTable table) {
+    List<TColumnDescriptor> colDescs = Lists.<TColumnDescriptor>newArrayList();
+    for (Column col: table.getColumns()) {
+      colDescs.add(new TColumnDescriptor(col.getName(), col.getType().toThrift()));
+    }
+    return colDescs;
+  }
+
   /**
    * Returns the value of the ROW_COUNT constant, or -1 if not found.
    */
@@ -248,4 +262,72 @@ public abstract class FeCatalogUtils {
     }
     return "(" + Joiner.on(" AND " ).join(conjuncts) + ")";
   }
+
+  /**
+   * Return the most commonly-used file format within a set of partitions.
+   */
+  public static HdfsFileFormat getMajorityFormat(
+      Iterable<? extends FeFsPartition> partitions) {
+    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
+    for (FeFsPartition partition: partitions) {
+      HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
+      Integer numPartitions = numPartitionsByFormat.get(format);
+      if (numPartitions == null) {
+        numPartitions = Integer.valueOf(1);
+      } else {
+        numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
+      }
+      numPartitionsByFormat.put(format, numPartitions);
+    }
+
+    int maxNumPartitions = Integer.MIN_VALUE;
+    HdfsFileFormat majorityFormat = null;
+    for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
+      if (entry.getValue().intValue() > maxNumPartitions) {
+        majorityFormat = entry.getKey();
+        maxNumPartitions = entry.getValue().intValue();
+      }
+    }
+    Preconditions.checkNotNull(majorityFormat);
+    return majorityFormat;
+  }
+
+  public static THdfsPartition fsPartitionToThrift(FeFsPartition part,
+      boolean includeFileDesc, boolean includeIncrementalStats) {
+    HdfsStorageDescriptor sd = part.getInputFormatDescriptor();
+    THdfsPartition thriftHdfsPart = new THdfsPartition(
+        sd.getLineDelim(),
+        sd.getFieldDelim(),
+        sd.getCollectionDelim(),
+        sd.getMapKeyDelim(),
+        sd.getEscapeChar(),
+        sd.getFileFormat().toThrift(),
+        Expr.treesToThrift(part.getPartitionValues()),
+        sd.getBlockSize());
+    thriftHdfsPart.setLocation(part.getLocationAsThrift());
+    thriftHdfsPart.setStats(new TTableStats(part.getNumRows()));
+    thriftHdfsPart.setAccess_level(part.getAccessLevel());
+    thriftHdfsPart.setIs_marked_cached(part.isMarkedCached());
+    thriftHdfsPart.setId(part.getId());
+    thriftHdfsPart.setHas_incremental_stats(part.hasIncrementalStats());
+    // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
+    // may try to serialize the returned THdfsPartition after releasing the table's lock,
+    // and another thread doing DDL may modify the map.
+    thriftHdfsPart.setHms_parameters(Maps.newHashMap(
+        includeIncrementalStats ? part.getParameters() :
+          part.getFilteredHmsParameters()));
+    if (includeFileDesc) {
+      // Add block location information
+      long numBlocks = 0;
+      long totalFileBytes = 0;
+      for (FileDescriptor fd: part.getFileDescriptors()) {
+        thriftHdfsPart.addToFile_desc(fd.toThrift());
+        numBlocks += fd.getNumFileBlocks();
+        totalFileBytes += fd.getFileLength();
+      }
+      thriftHdfsPart.setNum_blocks(numBlocks);
+      thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
+    }
+    return thriftHdfsPart;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index 52edeb8..ab6a3eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TAccessLevel;
+import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TPartitionStats;
 
 /**
@@ -68,6 +69,11 @@ public interface FeFsPartition {
   String getLocation();
 
   /**
+   * Return the location of this partition, serialized in Thrift.
+   */
+  THdfsPartitionLocation getLocationAsThrift();
+
+  /**
    * @return the location of this partition as a Path
    */
   Path getLocationPath();
@@ -146,4 +152,14 @@ public interface FeFsPartition {
    * @return the HMS parameters stored for this partition
    */
   Map<String, String> getParameters();
+
+  /**
+   * Returns the HMS partition parameters after filtering out all the partition
+   * incremental stats information.
+   *
+   * TODO(todd): consider _always_ filtering the parameters to remove incremental
+   * stats, and having a different getter which returns the already-decoded stats
+   * which are more memory-efficient anyway.
+   */
+  Map<String, String> getFilteredHmsParameters();
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 86bf2eb..508a0e5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -100,6 +100,14 @@ public interface FeFsTable extends FeTable {
   boolean isAvroTable();
 
   /**
+   * @return the format that the majority of partitions in this table use
+   *
+   * TODO(todd): this API needs to be removed, since it depends on loading all
+   * partitions even when scanning few.
+   */
+  public HdfsFileFormat getMajorityFormat();
+
+  /**
    * @param totalBytes_ the known number of bytes in the table
    * @return Returns an estimated row count for the given number of file bytes
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 26f8c30..640aaa1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -32,9 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.LiteralExpr;
-import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
-import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
@@ -47,9 +45,9 @@ import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionStats;
-import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
 import org.apache.impala.util.ListMap;
 import org.slf4j.Logger;
@@ -485,9 +483,16 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
    * Returns the storage location (HDFS path) of this partition. Should only be called
    * for partitioned tables.
    */
+  @Override
   public String getLocation() {
     return (location_ != null) ? location_.toString() : null;
   }
+
+  @Override
+  public THdfsPartitionLocation getLocationAsThrift() {
+    return location_ != null ? location_.toThrift() : null;
+  }
+
   @Override // FeFsPartition
   public Path getLocationPath() { return new Path(getLocation()); }
   @Override // FeFsPartition
@@ -733,20 +738,18 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
       .toString();
   }
 
-  private static Predicate<String> isIncrementalStatsKey = new Predicate<String>() {
-    @Override
-    public boolean apply(String key) {
-      return !(key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS)
-          || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX));
-    }
-  };
+  public static Predicate<String> IS_NOT_INCREMENTAL_STATS_KEY =
+      new Predicate<String>() {
+        @Override
+        public boolean apply(String key) {
+          return !(key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_NUM_CHUNKS)
+              || key.startsWith(PartitionStatsUtil.INCREMENTAL_STATS_CHUNK_PREFIX));
+        }
+      };
 
-  /**
-   * Returns hmsParameters_ after filtering out all the partition
-   * incremental stats information.
-   */
-  private Map<String, String> getFilteredHmsParameters() {
-    return Maps.filterKeys(hmsParameters_, isIncrementalStatsKey);
+  @Override
+  public Map<String, String> getFilteredHmsParameters() {
+    return Maps.filterKeys(hmsParameters_, IS_NOT_INCREMENTAL_STATS_KEY);
   }
 
   public static HdfsPartition fromThrift(HdfsTable table,
@@ -831,44 +834,6 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition {
     }
   }
 
-  public THdfsPartition toThrift(boolean includeFileDesc,
-      boolean includeIncrementalStats) {
-    List<TExpr> thriftExprs = Expr.treesToThrift(getPartitionValues());
-
-    THdfsPartition thriftHdfsPart = new THdfsPartition(
-        fileFormatDescriptor_.getLineDelim(),
-        fileFormatDescriptor_.getFieldDelim(),
-        fileFormatDescriptor_.getCollectionDelim(),
-        fileFormatDescriptor_.getMapKeyDelim(),
-        fileFormatDescriptor_.getEscapeChar(),
-        fileFormatDescriptor_.getFileFormat().toThrift(), thriftExprs,
-        fileFormatDescriptor_.getBlockSize());
-    if (location_ != null) thriftHdfsPart.setLocation(location_.toThrift());
-    thriftHdfsPart.setStats(new TTableStats(numRows_));
-    thriftHdfsPart.setAccess_level(accessLevel_);
-    thriftHdfsPart.setIs_marked_cached(isMarkedCached_);
-    thriftHdfsPart.setId(getId());
-    thriftHdfsPart.setHas_incremental_stats(hasIncrementalStats());
-    // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
-    // may try to serialize the returned THdfsPartition after releasing the table's lock,
-    // and another thread doing DDL may modify the map.
-    thriftHdfsPart.setHms_parameters(Maps.newHashMap(
-        includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters()));
-    if (includeFileDesc) {
-      // Add block location information
-      long numBlocks = 0;
-      long totalFileBytes = 0;
-      for (FileDescriptor fd: fileDescriptors_) {
-        thriftHdfsPart.addToFile_desc(fd.toThrift());
-        numBlocks += fd.getNumFileBlocks();
-        totalFileBytes += fd.getFileLength();
-      }
-      thriftHdfsPart.setNum_blocks(numBlocks);
-      thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
-    }
-    return thriftHdfsPart;
-  }
-
   /**
    * Comparator to allow ordering of partitions by their partition-key values.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index e6c34bc..13521df 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1716,8 +1716,8 @@ public class HdfsTable extends Table implements FeFsTable {
     for (HdfsPartition partition: partitionMap_.values()) {
       long id = partition.getId();
       if (refPartitions == null || refPartitions.contains(id)) {
-        THdfsPartition tHdfsPartition =
-            partition.toThrift(includeFileDesc, includeIncrementalStats);
+        THdfsPartition tHdfsPartition = FeCatalogUtils.fsPartitionToThrift(
+            partition, includeFileDesc, includeIncrementalStats);
         if (tHdfsPartition.isSetHas_incremental_stats() &&
             tHdfsPartition.isHas_incremental_stats()) {
           memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
@@ -1736,7 +1736,8 @@ public class HdfsTable extends Table implements FeFsTable {
     }
     if (includeFileDesc) fileMetadataStats_.set(stats);
 
-    THdfsPartition prototypePartition = prototypePartition_.toThrift(false, false);
+    THdfsPartition prototypePartition = FeCatalogUtils.fsPartitionToThrift(
+        prototypePartition_, false, false);
 
     memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
         fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
@@ -1777,28 +1778,7 @@ public class HdfsTable extends Table implements FeFsTable {
     // important to add the "prototype" partition as a fallback.
     Iterable<HdfsPartition> partitionsToConsider = Iterables.concat(
         partitionMap_.values(), Collections.singleton(prototypePartition_));
-    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newHashMap();
-    for (HdfsPartition partition: partitionsToConsider) {
-      HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
-      Integer numPartitions = numPartitionsByFormat.get(format);
-      if (numPartitions == null) {
-        numPartitions = Integer.valueOf(1);
-      } else {
-        numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
-      }
-      numPartitionsByFormat.put(format, numPartitions);
-    }
-
-    int maxNumPartitions = Integer.MIN_VALUE;
-    HdfsFileFormat majorityFormat = null;
-    for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
-      if (entry.getValue().intValue() > maxNumPartitions) {
-        majorityFormat = entry.getKey();
-        maxNumPartitions = entry.getValue().intValue();
-      }
-    }
-    Preconditions.checkNotNull(majorityFormat);
-    return majorityFormat;
+    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index cdeffbf..7fdc3c9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -420,11 +420,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
    * Returns a list of thrift column descriptors ordered by position.
    */
   public List<TColumnDescriptor> getTColumnDescriptors() {
-    List<TColumnDescriptor> colDescs = Lists.<TColumnDescriptor>newArrayList();
-    for (Column col: colsByPos_) {
-      colDescs.add(new TColumnDescriptor(col.getName(), col.getType().toThrift()));
-    }
-    return colDescs;
+    return FeCatalogUtils.getTColumnDescriptors(this);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index c33b8a2..effe7c2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -28,14 +28,17 @@ import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsPartition;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException;
 import org.apache.impala.catalog.PartitionStatsUtil;
 import org.apache.impala.thrift.TAccessLevel;
+import org.apache.impala.thrift.THdfsPartitionLocation;
 import org.apache.impala.thrift.TPartitionStats;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 
 public class LocalFsPartition implements FeFsPartition {
   private final LocalFsTable table_;
@@ -72,8 +75,10 @@ public class LocalFsPartition implements FeFsPartition {
 
   @Override
   public boolean hasFileDescriptors() {
-    // TODO Auto-generated method stub
-    return false;
+    // TODO(todd): implement file fetching. Return true for now
+    // so that partition pruning can be tested -- if we return false
+    // then all partitions would be pruned.
+    return true;
   }
 
   @Override
@@ -88,6 +93,13 @@ public class LocalFsPartition implements FeFsPartition {
   }
 
   @Override
+  public THdfsPartitionLocation getLocationAsThrift() {
+    // TODO(todd): support prefix-compressed partition locations. For now,
+    // using -1 indicates that the location is a full path string.
+    return new THdfsPartitionLocation(/*prefix_index=*/-1, getLocation());
+  }
+
+  @Override
   public Path getLocationPath() {
     return new Path(getLocation());
   }
@@ -182,4 +194,13 @@ public class LocalFsPartition implements FeFsPartition {
   public Map<String, String> getParameters() {
     return msPartition_.getParameters();
   }
+
+  @Override
+  public Map<String, String> getFilteredHmsParameters() {
+    // TODO(todd): for now, copied from HdfsPartition. Eventually we would want to
+    // lazy-fetch these parameters separately for only the cases that require them,
+    // since they are quite large.
+    return Maps.filterKeys(getParameters(),
+        HdfsPartition.IS_NOT_INCREMENTAL_STATS_KEY);
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 352655c..f648ac8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -31,20 +31,28 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
+import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.PrunablePartition;
+import org.apache.impala.thrift.CatalogObjectsConstants;
+import org.apache.impala.thrift.THdfsPartition;
+import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.ListMap;
 import org.apache.thrift.TException;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class LocalFsTable extends LocalTable implements FeFsTable {
   /**
@@ -134,6 +142,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
+  public HdfsFileFormat getMajorityFormat() {
+    // Needed by HdfsTableSink.
+    throw new UnsupportedOperationException("TODO: implement me");
+  }
+
+  @Override
   public long getExtrapolatedNumRows(long totalBytes) {
     // TODO Auto-generated method stub
     return 0;
@@ -151,6 +165,47 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
+  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
+    Preconditions.checkNotNull(referencedPartitions);
+    Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
+    List<? extends FeFsPartition> partitions = loadPartitions(referencedPartitions);
+    for (FeFsPartition partition : partitions) {
+      idToPartition.put(partition.getId(),
+          FeCatalogUtils.fsPartitionToThrift(partition,
+              /*includeFileDesc=*/false,
+              /*includeIncrementalStats=*/false));
+    }
+
+    THdfsPartition tPrototypePartition = FeCatalogUtils.fsPartitionToThrift(
+        createPrototypePartition(),
+        /*includeFileDesc=*/false,
+        /*includeIncrementalStats=*/false);
+
+    // TODO(todd): implement avro schema support
+    // TODO(todd): set multiple_filesystems member?
+    THdfsTable hdfsTable = new THdfsTable(getHdfsBaseDir(), getColumnNames(),
+        getNullPartitionKeyValue(), schemaInfo_.getNullColumnValue(), idToPartition,
+        tPrototypePartition);
+
+    TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
+        FeCatalogUtils.getTColumnDescriptors(this),
+        getNumClusteringCols(), name_, db_.getName());
+    tableDesc.setHdfsTable(hdfsTable);
+    return tableDesc;
+  }
+
+  private LocalFsPartition createPrototypePartition() {
+    Partition protoMsPartition = new Partition();
+    protoMsPartition.setSd(getMetaStoreTable().getSd());
+    protoMsPartition.setParameters(Collections.<String, String>emptyMap());
+    LocalPartitionSpec spec = new LocalPartitionSpec(
+        this, "", CatalogObjectsConstants.PROTOTYPE_PARTITION_ID);
+    LocalFsPartition prototypePartition = new LocalFsPartition(
+        this, spec, protoMsPartition);
+    return prototypePartition;
+  }
+
+  @Override
   public Collection<? extends PrunablePartition> getPartitions() {
     loadPartitionSpecs();
     return partitionSpecs_.values();
@@ -258,7 +313,8 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
     if (partitionValueMap_ != null) return;
 
     loadPartitionSpecs();
-    ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> valMapByCol = new ArrayList<>();
+    ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> valMapByCol =
+        new ArrayList<>();
     ArrayList<HashSet<Long>> nullParts = new ArrayList<>();
 
     for (int i = 0; i < getNumClusteringCols(); i++) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 246b691..a0a7bd7 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -19,7 +19,6 @@ package org.apache.impala.catalog.local;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import javax.annotation.concurrent.Immutable;
 
@@ -37,7 +36,6 @@ import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.thrift.TCatalogObjectType;
-import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.thrift.TException;
 
@@ -53,7 +51,7 @@ import com.google.common.collect.Lists;
  * This class is not thread-safe. A new instance is created for
  * each catalog instance.
  */
-class LocalTable implements FeTable {
+abstract class LocalTable implements FeTable {
   protected final LocalDb db_;
   /** The lower-case name of the table. */
   protected final String name_;
@@ -181,11 +179,6 @@ class LocalTable implements FeTable {
     return schemaInfo_.tableStats_;
   }
 
-  @Override
-  public TTableDescriptor toThriftDescriptor(int tableId, Set<Long> referencedPartitions) {
-    throw new UnsupportedOperationException("TODO");
-  }
-
   /**
    * The table schema, loaded from the HMS Table object. This is common
    * to all Table implementations and includes the column definitions and
@@ -275,5 +268,9 @@ class LocalTable implements FeTable {
       Preconditions.checkArgument(colsByPos_.get(c.getPosition()) == c);
       return c.getPosition() < numClusteringCols_;
     }
+
+    protected String getNullColumnValue() {
+      return nullColumnValue_;
+    }
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 378ad40..546052e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -49,13 +49,13 @@ import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.analysis.TupleId;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnStats;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.HdfsFileFormat;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
-import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
@@ -162,7 +162,7 @@ public class HdfsScanNode extends ScanNode {
   // Read size for Parquet and ORC footers. Matches HdfsScanner::FOOTER_SIZE in backend.
   private static final long FOOTER_SIZE = 100L * 1024L;
 
-  private final HdfsTable tbl_;
+  private final FeFsTable tbl_;
 
   // List of partitions to be scanned. Partitions have been pruned.
   private final List<? extends FeFsPartition> partitions_;
@@ -281,13 +281,13 @@ public class HdfsScanNode extends ScanNode {
       List<? extends FeFsPartition> partitions, TableRef hdfsTblRef, AggregateInfo aggInfo) {
     super(id, desc, "SCAN HDFS");
     Preconditions.checkState(desc.getTable() instanceof FeFsTable);
-    tbl_ = (HdfsTable)desc.getTable();
+    tbl_ = (FeFsTable)desc.getTable();
     conjuncts_ = conjuncts;
     partitions_ = partitions;
     sampleParams_ = hdfsTblRef.getSampleParams();
     replicaPreference_ = hdfsTblRef.getReplicaPreference();
     randomReplica_ = hdfsTblRef.getRandomReplica();
-    HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable();
+    FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable();
     Preconditions.checkState(tbl_ == hdfsTable);
     StringBuilder error = new StringBuilder();
     aggInfo_ = aggInfo;
@@ -1193,7 +1193,7 @@ public class HdfsScanNode extends ScanNode {
   protected String getNodeExplainString(String prefix, String detailPrefix,
       TExplainLevel detailLevel) {
     StringBuilder output = new StringBuilder();
-    HdfsTable table = (HdfsTable) desc_.getTable();
+    FeFsTable table = (FeFsTable) desc_.getTable();
     output.append(String.format("%s%s [%s", prefix, getDisplayLabel(),
         getDisplayLabelDetail()));
     if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
@@ -1345,8 +1345,7 @@ public class HdfsScanNode extends ScanNode {
 
     Preconditions.checkState(0 < numNodes_ && numNodes_ <= scanRangeSize);
     Preconditions.checkNotNull(desc_);
-    Preconditions.checkState(desc_.getTable() instanceof HdfsTable);
-    HdfsTable table = (HdfsTable) desc_.getTable();
+    Preconditions.checkState(desc_.getTable() instanceof FeFsTable);
     List<Long> columnReservations = null;
     if (fileFormats_.contains(HdfsFileFormat.PARQUET)
         || fileFormats_.contains(HdfsFileFormat.ORC)) {
@@ -1354,8 +1353,9 @@ public class HdfsScanNode extends ScanNode {
     }
 
     int perHostScanRanges;
-    if (table.getMajorityFormat() == HdfsFileFormat.PARQUET
-        || table.getMajorityFormat() == HdfsFileFormat.ORC) {
+    HdfsFileFormat majorityFormat = FeCatalogUtils.getMajorityFormat(partitions_);
+    if (majorityFormat == HdfsFileFormat.PARQUET
+        || majorityFormat == HdfsFileFormat.ORC) {
       Preconditions.checkNotNull(columnReservations);
       // For the purpose of this estimation, the number of per-host scan ranges for
       // Parquet/ORC files are equal to the number of columns read from the file. I.e.
@@ -1475,7 +1475,7 @@ public class HdfsScanNode extends ScanNode {
    */
   private List<Long> computeMinColumnMemReservations() {
     List<Long> columnByteSizes = Lists.newArrayList();
-    HdfsTable table = (HdfsTable) desc_.getTable();
+    FeFsTable table = (FeFsTable) desc_.getTable();
     boolean havePosSlot = false;
     for (SlotDescriptor slot: desc_.getSlots()) {
       if (!slot.isMaterialized() || slot == countStarSlot_) continue;