You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/05/10 20:42:19 UTC

[4/4] incubator-impala git commit: IMPALA-4029: Reduce memory requirements for storing file metadata

IMPALA-4029: Reduce memory requirements for storing file metadata

This commit improves the memory requirements for storing file and block
metadata in the catalog and the impalad nodes by using the FlatBuffers
serialization library.

Testing:
Passed an exhaustive tests run.

Benchmark:
Memory requirement for storing an HDFS table with 250K files is reduced
by 2.5X.

Change-Id: I483d3cadc9d459f71a310c35a130d073597b0983
Reviewed-on: http://gerrit.cloudera.org:8080/6406
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d6b5f82e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d6b5f82e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d6b5f82e

Branch: refs/heads/master
Commit: d6b5f82e31732c355af3f3d1a8e5da94ba9c1349
Parents: eb7ee67
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Mar 2 16:51:42 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 10 09:23:05 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 common/fbs/CMakeLists.txt                       |  75 ++++
 common/fbs/CatalogObjects.fbs                   |  75 ++++
 common/thrift/CatalogObjects.thrift             |  38 +-
 fe/CMakeLists.txt                               |   2 +-
 fe/pom.xml                                      |   6 +
 .../org/apache/impala/catalog/DiskIdMapper.java |  44 ++-
 .../apache/impala/catalog/HdfsCompression.java  |  14 +
 .../apache/impala/catalog/HdfsPartition.java    | 384 ++++++++++++++-----
 .../org/apache/impala/catalog/HdfsTable.java    | 173 ++-------
 .../java/org/apache/impala/catalog/Table.java   |  15 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  32 +-
 .../catalog/CatalogObjectToFromThriftTest.java  |   2 +-
 .../apache/impala/common/FrontendTestBase.java  |  12 +
 14 files changed, 561 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e568aa..130eb5b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -371,6 +371,7 @@ message(STATUS "Breakpad library: " ${BREAKPAD_STATIC_LIB})
 # compile these subdirs using their own CMakeLists.txt
 add_subdirectory(common/function-registry)
 add_subdirectory(common/thrift)
+add_subdirectory(common/fbs)
 add_subdirectory(be)
 add_subdirectory(fe)
 add_subdirectory(ext-data-source)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/fbs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/fbs/CMakeLists.txt b/common/fbs/CMakeLists.txt
new file mode 100644
index 0000000..d0c4ef7
--- /dev/null
+++ b/common/fbs/CMakeLists.txt
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.6)
+
+# Helper function to generate build rules. For each input flatbuffer file (*.fbs), this
+# function will generate a rule that maps the input file to an output c++ file.
+# The flatbuffers compiler will generate multiple output files for each input file. In
+# particular, it will generate a '_generated.h' for c++ and one .java file per flatbuffer
+# table definition.
+#
+# To call this function, pass it the output file list followed by the input flatbuffers
+# files: e.g. FB_GEN(OUTPUT_FILES, ${FLATBUFFER_FILES})
+#
+function(FB_GEN VAR)
+  IF (NOT ARGN)
+    MESSAGE(SEND_ERROR "Error: FB_GEN called without any src files")
+    RETURN()
+  ENDIF(NOT ARGN)
+
+  set(${VAR})
+  foreach(FIL ${ARGN})
+    # Get full path
+    get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+    # Get basename
+    get_filename_component(FIL_WE ${FIL} NAME_WE)
+
+    set(OUTPUT_BE_FILE "${BE_OUTPUT_DIR}/${FIL_WE}_generated.h")
+    list(APPEND ${VAR} ${OUTPUT_BE_FILE})
+
+    add_custom_command(
+      OUTPUT ${OUTPUT_BE_FILE}
+      COMMAND ${FLATBUFFERS_COMPILER} ${CPP_ARGS} ${FIL}
+      COMMAND ${FLATBUFFERS_COMPILER} ${JAVA_FE_ARGS} ${FIL}
+      DEPENDS ${ABS_FIL}
+      COMMENT "Running FlatBuffers compiler on ${FIL}"
+      VERBATIM
+    )
+  endforeach(FIL)
+
+  set(${VAR} ${${VAR}} PARENT_SCOPE)
+endfunction(FB_GEN)
+
+message("Using FlatBuffers compiler: ${FLATBUFFERS_COMPILER}")
+set(BE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp)
+set(FE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/fe/generated-sources/gen-java)
+file(MAKE_DIRECTORY ${FE_OUTPUT_DIR})
+file(MAKE_DIRECTORY ${BE_OUTPUT_DIR})
+set(JAVA_FE_ARGS --java -o ${FE_OUTPUT_DIR} -b)
+message(${JAVA_FE_ARGS})
+set(CPP_ARGS --cpp -o ${BE_OUTPUT_DIR} -b)
+message(${CPP_ARGS})
+
+# Add new FlatBuffer schema files here.
+set (SRC_FILES
+  CatalogObjects.fbs
+)
+
+FB_GEN(FB_ALL_FILES ${SRC_FILES})
+add_custom_target(fb-deps ALL DEPENDS ${FB_ALL_FILES})
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/fbs/CatalogObjects.fbs
----------------------------------------------------------------------
diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
new file mode 100644
index 0000000..bf44380
--- /dev/null
+++ b/common/fbs/CatalogObjects.fbs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace org.apache.impala.fb;
+
+// Supported compression algorithms. This needs to match the values in
+// CatalogObjects.THdfsCompression enum.
+enum FbCompression: byte {
+  NONE,
+  DEFAULT,
+  GZIP,
+  DEFLATE,
+  BZIP2,
+  SNAPPY,
+  SNAPPY_BLOCKED,
+  LZO,
+  LZ4,
+  ZLIB
+}
+
+table FbFileBlock {
+  // Offset of this block within the file
+  // TODO: Remove this field if file blocks are retrieved by offset. Infer offset using
+  // the block length.
+  offset: long = 0 (id: 0);
+
+  // Total length of the block.
+  // TODO: Remove this field and compute the block length using the offsets, block size,
+  // and file length.
+  length: long = -1 (id: 1);
+
+  // Hosts that contain replicas of this block. Each value in the list is an index to
+  // the network_addresses list of THdfsTable. The most significant bit of each
+  // replica host index indicates if the replica is cached.
+  replica_host_idxs: [ushort] (id: 2);
+
+  // The list of disk ids for the file block. May not be set if disk ids are not
+  // supported.
+  disk_ids: [ushort] (id: 3);
+}
+
+table FbFileDesc {
+  // The name of the file (not the full path). The parent path is assumed to be the
+  // 'location' of the Partition this file resides within.
+  // TODO: Investigate the use of prefix-based compression for file names.
+  file_name: string (id: 0);
+
+  // The total length of the file, in bytes.
+  length: long (id: 1);
+
+  // The type of compression used for this file.
+  // TODO: Check if reordering these fields can produce some space savings by eliminating
+  // added padding.
+  compression: FbCompression (id: 2);
+
+  // The last modified time of the file.
+  last_modification_time: long (id: 3);
+
+  // List of FbFileBlocks that make up this file.
+  file_blocks: [FbFileBlock] (id: 4);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index f7ce00b..eeeae44 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -212,42 +212,12 @@ struct TColumn {
   16: optional i32 block_size
 }
 
-// Represents a block in an HDFS file
-struct THdfsFileBlock {
-  // Offset of this block within the file
-  1: required i64 offset
-
-  // Total length of the block
-  2: required i64 length
-
-  // Hosts that contain replicas of this block. Each value in the list is an index in to
-  // the network_addresses list of THdfsTable.
-  3: required list<i32> replica_host_idxs
-
-  // The list of disk ids for the file block. May not be set if disk ids are not supported
-  4: optional list<i32> disk_ids
-
-  // For each replica, specifies if the block is cached in memory.
-  5: optional list<bool> is_replica_cached
-}
-
 // Represents an HDFS file in a partition.
 struct THdfsFileDesc {
-  // The name of the file (not the full path). The parent path is assumed to be the
-  // 'location' of the THdfsPartition this file resides within.
-  1: required string file_name
-
-  // The total length of the file, in bytes.
-  2: required i64 length
-
-  // The type of compression used for this file.
-  3: required THdfsCompression compression
-
-  // The last modified time of the file.
-  4: required i64 last_modification_time
-
-  // List of THdfsFileBlocks that make up this file.
-  5: required list<THdfsFileBlock> file_blocks
+  // File descriptor metadata serialized into a FlatBuffer
+  // (defined in common/fbs/CatalogObjects.fbs).
+  // TODO: Put this in a KRPC sidecar to avoid serialization cost.
+  1: required binary file_desc_data
 }
 
 // Represents an HDFS partition's location in a compressed format. 'prefix_index'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt
index e06f467..eee1601 100644
--- a/fe/CMakeLists.txt
+++ b/fe/CMakeLists.txt
@@ -15,6 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_custom_target(fe ALL DEPENDS thrift-deps function-registry ext-data-source
+add_custom_target(fe ALL DEPENDS thrift-deps fb-deps function-registry ext-data-source
   COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 3399544..c6aeaa3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -352,6 +352,12 @@ under the License.
       <version>1.0.2</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.davidmoten</groupId>
+      <artifactId>flatbuffers-java</artifactId>
+      <version>1.6.0.1</version>
+    </dependency>
+
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
index 6fdcef0..87d7203 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import com.google.common.collect.Maps;
 import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
 
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,14 +42,14 @@ public class DiskIdMapper {
     private DiskIdMapper() {}
 
     // Maps each storage ID UUID string returned by the BlockLocation API, to a per-node
-    // sequential 0-based integer disk id used by the BE scanners. This assumes that
+    // sequential 0-based disk id used by the BE scanners. This assumes that
     // the storage ID of a particular disk is unique across all the nodes in the cluster.
-    private ConcurrentHashMap<String, Integer> storageUuidToDiskId =
-        new ConcurrentHashMap<String, Integer>();
+    private ConcurrentHashMap<String, Short> storageUuidToDiskId_ =
+        new ConcurrentHashMap<String, Short>();
 
-    // Per-host ID generator for storage UUID to integer ID mapping. This maps each host
-    // to the corresponding latest 0-based integer ID.
-    private HashMap<String, Integer> storageIdGenerator = Maps.newHashMap();
+    // Per-host ID generator for storage UUID to Short ID mapping. This maps each host
+    // to the corresponding latest 0-based ID stored in a short.
+    private HashMap<String, Short> storageIdGenerator_ = Maps.newHashMap();
 
     /**
      * Returns a disk id (0-based) index for storageUuid on host 'host'. Generates a
@@ -58,30 +59,35 @@ public class DiskIdMapper {
      * TODO: It is quite possible that there will be lock contention in this method during
      * the initial metadata load. Figure out ways to fix it using finer locking scheme.
      */
-    public int getDiskId(String host, String storageUuid) {
+    public short getDiskId(String host, String storageUuid) {
       Preconditions.checkState(!Strings.isNullOrEmpty(host));
       // Initialize the diskId as -1 to indicate it is unknown
-      int diskId = -1;
+      short diskId = -1;
       // Check if an existing mapping is already present. This is intentionally kept
       // out of the synchronized block to avoid contention for lookups. Once a reasonable
-      // amount of data loading is done and storageIdtoInt is populated with storage IDs
-      // across the cluster, we expect to have a good hit rate.
-      Integer intId = storageUuidToDiskId.get(storageUuid);
-      if (intId != null) return intId;
-      synchronized (storageIdGenerator) {
+      // amount of data loading is done and storageUuidToDiskId_ is populated with storage
+      // IDs across the cluster, we expect to have a good hit rate.
+      Short shortId = storageUuidToDiskId_.get(storageUuid);
+      if (shortId != null) return shortId;
+      synchronized (storageIdGenerator_) {
         // Mapping might have been added by another thread that entered the synchronized
         // block first.
-        intId = storageUuidToDiskId.get(storageUuid);
-        if (intId != null) return intId;
+        shortId = storageUuidToDiskId_.get(storageUuid);
+        if (shortId != null) return shortId;
         // No mapping exists, create a new disk ID for 'storageUuid'
-        if (storageIdGenerator.containsKey(host)) {
-          diskId = storageIdGenerator.get(host) + 1;
+        if (storageIdGenerator_.containsKey(host)) {
+          try {
+            diskId = Shorts.checkedCast(storageIdGenerator_.get(host) + 1);
+          } catch (IllegalStateException e) {
+            Preconditions.checkState(false,
+                "Number of hosts exceeded " + Short.MAX_VALUE);
+          }
         } else {
           // First diskId of this host.
           diskId = 0;
         }
-        storageIdGenerator.put(host, new Integer(diskId));
-        storageUuidToDiskId.put(storageUuid, new Integer(diskId));
+        storageIdGenerator_.put(host, Short.valueOf(diskId));
+        storageUuidToDiskId_.put(storageUuid, Short.valueOf(diskId));
       }
       return diskId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index 36a1e9e..dd81587 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -17,7 +17,9 @@
 
 package org.apache.impala.catalog;
 
+import org.apache.impala.fb.FbCompression;
 import org.apache.impala.thrift.THdfsCompression;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
@@ -73,6 +75,18 @@ public enum HdfsCompression {
     }
   }
 
+  public byte toFb() {
+    switch (this) {
+      case NONE: return FbCompression.NONE;
+      case DEFLATE: return FbCompression.DEFLATE;
+      case GZIP: return FbCompression.GZIP;
+      case BZIP2: return FbCompression.BZIP2;
+      case SNAPPY: return FbCompression.SNAPPY;
+      case LZO: return FbCompression.LZO;
+      default: throw new IllegalStateException("Unexpected codec: " + this);
+    }
+  }
+
   /* Returns a compression type based on (Hive's) intput format. Special case for LZO. */
   public static HdfsCompression fromHdfsInputFormatClass(String inputFormatClass) {
     // TODO: Remove when we have the native LZO writer.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index de1a948..76fddcf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -17,15 +17,15 @@
 
 package org.apache.impala.catalog;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,27 +37,40 @@ import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
+import org.apache.impala.fb.FbCompression;
+import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.fb.FbFileDesc;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
-import org.apache.impala.thrift.THdfsCompression;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.impala.util.ListMap;
+
+import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
 
 /**
  * Query-relevant information for one table partition. Partitions are comparable
@@ -68,67 +81,155 @@ import com.google.common.collect.Maps;
 public class HdfsPartition implements Comparable<HdfsPartition> {
   /**
    * Metadata for a single file in this partition.
-   * TODO: Do we even need this class? Just get rid of it and use the Thrift version?
    */
   static public class FileDescriptor implements Comparable<FileDescriptor> {
-    private final THdfsFileDesc fileDescriptor_;
+    // An invalid network address, which will always be treated as remote.
+    private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
+        new TNetworkAddress("remote*addr", 0);
 
-    public String getFileName() { return fileDescriptor_.getFile_name(); }
-    public long getFileLength() { return fileDescriptor_.getLength(); }
-    public THdfsCompression getFileCompression() {
-      return fileDescriptor_.getCompression();
-    }
-    public long getModificationTime() {
-      return fileDescriptor_.getLast_modification_time();
+    // Minimum block size in bytes allowed for synthetic file blocks (other than the last
+    // block, which may be shorter).
+    private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
+
+    // Internal representation of a file descriptor using a FlatBuffer.
+    private final FbFileDesc fbFileDescriptor_;
+
+    private FileDescriptor(FbFileDesc fileDescData) { fbFileDescriptor_ = fileDescData; }
+
+    public static FileDescriptor fromThrift(THdfsFileDesc desc) {
+      ByteBuffer bb = ByteBuffer.wrap(desc.getFile_desc_data());
+      return new FileDescriptor(FbFileDesc.getRootAsFbFileDesc(bb));
     }
-    public List<THdfsFileBlock> getFileBlocks() {
-      return fileDescriptor_.getFile_blocks();
+
+    /**
+     * Creates the file descriptor of a file represented by 'fileStatus' with blocks
+     * stored in 'blockLocations'. 'fileSystem' is the filesystem where the
+     * file resides and 'hostIndex' stores the network addresses of the hosts that store
+     * blocks of the parent HdfsTable. Populates 'numUnknownDiskIds' with the number of
+     * unknown disk ids.
+     */
+    public static FileDescriptor create(FileStatus fileStatus,
+        BlockLocation[] blockLocations, FileSystem fileSystem,
+        ListMap<TNetworkAddress> hostIndex, Reference<Long> numUnknownDiskIds)
+        throws IOException {
+      Preconditions.checkState(FileSystemUtil.supportsStorageIds(fileSystem));
+      FlatBufferBuilder fbb = new FlatBufferBuilder(1);
+      int[] fbFileBlockOffsets = new int[blockLocations.length];
+      int blockIdx = 0;
+      for (BlockLocation loc: blockLocations) {
+        fbFileBlockOffsets[blockIdx++] = FileBlock.createFbFileBlock(fbb, loc, hostIndex,
+            numUnknownDiskIds);
+      }
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, fbFileBlockOffsets));
     }
 
-    public THdfsFileDesc toThrift() { return fileDescriptor_; }
+    /**
+     * Creates the file descriptor of a file represented by 'fileStatus' that
+     * resides in a filesystem that doesn't support the BlockLocation API (e.g. S3).
+     * fileFormat' is the file format of the partition where this file resides and
+     * 'hostIndex' stores the network addresses of the hosts that store blocks of
+     * the parent HdfsTable.
+     */
+    public static FileDescriptor createWithSynthesizedBlockMd(FileStatus fileStatus,
+        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
+      FlatBufferBuilder fbb = new FlatBufferBuilder(1);
+      int[] fbFileBlockOffets =
+          synthesizeFbBlockMd(fbb, fileStatus, fileFormat, hostIndex);
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, fbFileBlockOffets));
+    }
 
-    public FileDescriptor(String fileName, long fileLength, long modificationTime) {
-      Preconditions.checkNotNull(fileName);
-      Preconditions.checkArgument(fileLength >= 0);
-      fileDescriptor_ = new THdfsFileDesc();
-      fileDescriptor_.setFile_name(fileName);
-      fileDescriptor_.setLength(fileLength);
-      fileDescriptor_.setLast_modification_time(modificationTime);
-      fileDescriptor_.setCompression(
-          HdfsCompression.fromFileName(fileName).toThrift());
-      List<THdfsFileBlock> emptyFileBlockList = Lists.newArrayList();
-      fileDescriptor_.setFile_blocks(emptyFileBlockList);
+    /**
+     * Serializes the metadata of a file descriptor represented by 'fileStatus' into a
+     * FlatBuffer using 'fbb' and returns the associated FbFileDesc object. 'blockOffsets'
+     * are the offsets of the serialized block metadata of this file in the underlying
+     * buffer.
+     */
+    private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
+        FileStatus fileStatus, int[] fbFileBlockOffets) {
+      int fileNameOffset = fbb.createString(fileStatus.getPath().getName());
+      int blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, fbFileBlockOffets);
+      FbFileDesc.startFbFileDesc(fbb);
+      FbFileDesc.addFileName(fbb, fileNameOffset);
+      FbFileDesc.addLength(fbb, fileStatus.getLen());
+      FbFileDesc.addLastModificationTime(fbb, fileStatus.getModificationTime());
+      HdfsCompression comp = HdfsCompression.fromFileName(fileStatus.getPath().getName());
+      FbFileDesc.addCompression(fbb, comp.toFb());
+      FbFileDesc.addFileBlocks(fbb, blockVectorOffset);
+      fbb.finish(FbFileDesc.endFbFileDesc(fbb));
+      // To eliminate memory fragmentation, copy the contents of the FlatBuffer to the
+      // smallest possible ByteBuffer.
+      ByteBuffer bb = fbb.dataBuffer().slice();
+      ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
+      compressedBb.put(bb);
+      return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
     }
 
-    private FileDescriptor(THdfsFileDesc fileDesc) {
-      this(fileDesc.getFile_name(), fileDesc.length, fileDesc.last_modification_time);
-      for (THdfsFileBlock block: fileDesc.getFile_blocks()) {
-        fileDescriptor_.addToFile_blocks(block);
+    /**
+     * Synthesizes the block metadata of a file represented by 'fileStatus' that resides
+     * in a filesystem that doesn't support the BlockLocation API. The block metadata
+     * consist of the length and offset of each file block. It serializes the
+     * block metadata into a FlatBuffer using 'fbb' and returns their offsets in the
+     * underlying buffer. 'fileFormat' is the file format of the underlying partition and
+     * 'hostIndex' stores the network addresses of the hosts that store the blocks of the
+     * parent HdfsTable.
+     */
+    private static int[] synthesizeFbBlockMd(FlatBufferBuilder fbb, FileStatus fileStatus,
+        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
+      long start = 0;
+      long remaining = fileStatus.getLen();
+      long blockSize = fileStatus.getBlockSize();
+      if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
+      if (!fileFormat.isSplittable(HdfsCompression.fromFileName(
+          fileStatus.getPath().getName()))) {
+        blockSize = remaining;
       }
+      List<Integer> fbFileBlockOffets = Lists.newArrayList();
+      while (remaining > 0) {
+        long len = Math.min(remaining, blockSize);
+        fbFileBlockOffets.add(FileBlock.createFbFileBlock(fbb, start, len,
+            (short) hostIndex.getIndex(REMOTE_NETWORK_ADDRESS)));
+        remaining -= len;
+        start += len;
+      }
+      return Ints.toArray(fbFileBlockOffets);
     }
 
-    public void addFileBlock(FileBlock blockMd) {
-      addThriftFileBlock(blockMd.toThrift());
+    public String getFileName() { return fbFileDescriptor_.fileName(); }
+    public long getFileLength() { return fbFileDescriptor_.length(); }
+
+    public HdfsCompression getFileCompression() {
+      return HdfsCompression.valueOf(FbCompression.name(fbFileDescriptor_.compression()));
     }
 
-    public void addThriftFileBlock(THdfsFileBlock block) {
-      fileDescriptor_.addToFile_blocks(block);
+    public long getModificationTime() { return fbFileDescriptor_.lastModificationTime(); }
+    public int getNumFileBlocks() { return fbFileDescriptor_.fileBlocksLength(); }
+
+    public FbFileBlock getFbFileBlock(int idx) {
+      return fbFileDescriptor_.fileBlocks(idx);
     }
 
-    public static FileDescriptor fromThrift(THdfsFileDesc desc) {
-      return new FileDescriptor(desc);
+    public THdfsFileDesc toThrift() {
+      THdfsFileDesc fd = new THdfsFileDesc();
+      ByteBuffer bb = fbFileDescriptor_.getByteBuffer();
+      fd.setFile_desc_data(bb);
+      return fd;
     }
 
     @Override
     public String toString() {
+      int numFileBlocks = getNumFileBlocks();
+      List<String> blocks = Lists.newArrayListWithCapacity(numFileBlocks);
+      for (int i = 0; i < numFileBlocks; ++i) {
+        blocks.add(FileBlock.debugString(getFbFileBlock(i)));
+      }
       return Objects.toStringHelper(this)
           .add("FileName", getFileName())
-          .add("Length", getFileLength()).toString();
+          .add("Length", getFileLength())
+          .add("Compression", getFileCompression())
+          .add("ModificationTime", getModificationTime())
+          .add("Blocks", Joiner.on(", ").join(blocks)).toString();
     }
 
-    /**
-     * Orders file descriptors lexicographically by file name.
-     */
     @Override
     public int compareTo(FileDescriptor otherFd) {
       return getFileName().compareTo(otherFd.getFileName());
@@ -140,14 +241,14 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
    */
   public static class BlockReplica {
     private final boolean isCached_;
-    private final int hostIdx_;
+    private final short hostIdx_;
 
     /**
      * Creates a BlockReplica given a host ID/index and a flag specifying whether this
      * replica is cahced. Host IDs are assigned when loading the block metadata in
      * HdfsTable.
      */
-    public BlockReplica(int hostIdx, boolean isCached) {
+    public BlockReplica(short hostIdx, boolean isCached) {
       hostIdx_ = hostIdx;
       isCached_ = isCached;
     }
@@ -168,88 +269,167 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     }
 
     public boolean isCached() { return isCached_; }
-    public int getHostIdx() { return hostIdx_; }
+    public short getHostIdx() { return hostIdx_; }
   }
 
   /**
-   * File Block metadata
+   * Static utility methods to serialize and access file block metadata from FlatBuffers.
    */
   public static class FileBlock {
-    private final THdfsFileBlock fileBlock_;
-    private boolean isCached_; // Set to true if there is at least one cached replica.
-
-    private FileBlock(THdfsFileBlock fileBlock) {
-      fileBlock_ = fileBlock;
-      isCached_ = false;
-      for (boolean isCached: fileBlock.getIs_replica_cached()) {
-        isCached_ |= isCached;
-      }
-    }
+    // Bit mask used to extract the replica host id and cache info of a file block.
+    // Use ~REPLICA_HOST_IDX_MASK to extract the cache info (stored in MSB).
+    private static short REPLICA_HOST_IDX_MASK = (1 << 15) - 1;
 
     /**
-     * Construct a FileBlock given the start offset (in bytes) of the file associated
-     * with this block, the length of the block (in bytes), and a list of BlockReplicas.
-     * Does not fill diskIds.
+     * Constructs an FbFileBlock object from the block location metadata
+     * 'loc'. Serializes the file block metadata into a FlatBuffer using 'fbb' and
+     * returns the offset in the underlying buffer where the encoded file block starts.
+     * 'hostIndex' stores the network addresses of the datanodes that store the files of
+     * the parent HdfsTable. Populates 'numUnknownDiskIds' with the number of unknown disk
+     * ids.
      */
-    public FileBlock(long offset, long blockLength,
-        List<BlockReplica> replicaHostIdxs) {
-      Preconditions.checkNotNull(replicaHostIdxs);
-      fileBlock_ = new THdfsFileBlock();
-      fileBlock_.setOffset(offset);
-      fileBlock_.setLength(blockLength);
-
-      fileBlock_.setReplica_host_idxs(new ArrayList<Integer>(replicaHostIdxs.size()));
-      fileBlock_.setIs_replica_cached(new ArrayList<Boolean>(replicaHostIdxs.size()));
-      isCached_ = false;
-      for (BlockReplica replica: replicaHostIdxs) {
-        fileBlock_.addToReplica_host_idxs(replica.getHostIdx());
-        fileBlock_.addToIs_replica_cached(replica.isCached());
-        isCached_ |= replica.isCached();
+    public static int createFbFileBlock(FlatBufferBuilder fbb, BlockLocation loc,
+        ListMap<TNetworkAddress> hostIndex, Reference<Long> numUnknownDiskIds)
+        throws IOException {
+      Preconditions.checkNotNull(fbb);
+      Preconditions.checkNotNull(loc);
+      Preconditions.checkNotNull(hostIndex);
+      // replica host ids
+      FbFileBlock.startReplicaHostIdxsVector(fbb, loc.getNames().length);
+      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
+      // Enumerate all replicas of the block, adding any unknown hosts
+      // to hostIndex. We pick the network address from getNames() and
+      // map it to the corresponding hostname from getHosts().
+      for (int i = 0; i < loc.getNames().length; ++i) {
+        TNetworkAddress networkAddress = BlockReplica.parseLocation(loc.getNames()[i]);
+        short replicaIdx = (short) hostIndex.getIndex(networkAddress);
+        boolean isReplicaCached = cachedHosts.contains(loc.getHosts()[i]);
+        replicaIdx = isReplicaCached ?
+            (short) (replicaIdx | ~REPLICA_HOST_IDX_MASK) : replicaIdx;
+        fbb.addShort(replicaIdx);
       }
-    }
+      int fbReplicaHostIdxOffset = fbb.endVector();
 
-    public long getOffset() { return fileBlock_.getOffset(); }
-    public long getLength() { return fileBlock_.getLength(); }
-    public List<Integer> getReplicaHostIdxs() {
-      return fileBlock_.getReplica_host_idxs();
+      // disk ids
+      short[] diskIds = createDiskIds(loc, numUnknownDiskIds);
+      Preconditions.checkState(diskIds.length != 0);
+      int fbDiskIdsOffset = FbFileBlock.createDiskIdsVector(fbb, diskIds);
+      FbFileBlock.startFbFileBlock(fbb);
+      FbFileBlock.addOffset(fbb, loc.getOffset());
+      FbFileBlock.addLength(fbb, loc.getLength());
+      FbFileBlock.addReplicaHostIdxs(fbb, fbReplicaHostIdxOffset);
+      FbFileBlock.addDiskIds(fbb, fbDiskIdsOffset);
+      return FbFileBlock.endFbFileBlock(fbb);
     }
 
     /**
-     * Populates the given THdfsFileBlock's list of disk ids with the given disk id
-     * values. The number of disk ids must match the number of network addresses
-     * set in the file block.
+     * Constructs an FbFileBlock object from the file block metadata that comprise block's
+     * 'offset', 'length' and replica index 'replicaIdx'. Serializes the file block
+     * metadata into a FlatBuffer using 'fbb' and returns the offset in the underlying
+     * buffer where the encoded file block starts.
      */
-    public static void setDiskIds(int[] diskIds, THdfsFileBlock fileBlock) {
-      Preconditions.checkArgument(
-          diskIds.length == fileBlock.getReplica_host_idxs().size());
-      fileBlock.setDisk_ids(Arrays.asList(ArrayUtils.toObject(diskIds)));
+    public static int createFbFileBlock(FlatBufferBuilder fbb, long offset, long length,
+        short replicaIdx) {
+      Preconditions.checkNotNull(fbb);
+      FbFileBlock.startReplicaHostIdxsVector(fbb, 1);
+      fbb.addShort(replicaIdx);
+      int fbReplicaHostIdxOffset = fbb.endVector();
+      FbFileBlock.startFbFileBlock(fbb);
+      FbFileBlock.addOffset(fbb, offset);
+      FbFileBlock.addLength(fbb, length);
+      FbFileBlock.addReplicaHostIdxs(fbb, fbReplicaHostIdxOffset);
+      return FbFileBlock.endFbFileBlock(fbb);
     }
 
     /**
-     * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
-     * disk id is not supported.
+     * Creates the disk ids of a block from its BlockLocation 'location'. Returns the
+     * disk ids and populates 'numUnknownDiskIds' with the number of unknown disk ids.
      */
-    public int getDiskId(int hostIndex) {
-      if (fileBlock_.disk_ids == null) return -1;
-      return fileBlock_.getDisk_ids().get(hostIndex);
+    private static short[] createDiskIds(BlockLocation location,
+        Reference<Long> numUnknownDiskIds) {
+      long unknownDiskIdCount = 0;
+      String[] storageIds = location.getStorageIds();
+      String[] hosts;
+      try {
+        hosts = location.getHosts();
+      } catch (IOException e) {
+        LOG.error("Couldn't get hosts for block: " + location.toString(), e);
+        return new short[0];
+      }
+      if (storageIds.length != hosts.length) {
+        LOG.error("Number of storage IDs and number of hosts for block: " + location
+            .toString() + " mismatch. Skipping disk ID loading for this block.");
+        return Shorts.toArray(Collections.<Short>emptyList());
+      }
+      short[] diskIDs = new short[storageIds.length];
+      for (int i = 0; i < storageIds.length; ++i) {
+        if (Strings.isNullOrEmpty(storageIds[i])) {
+          diskIDs[i] = (short) -1;
+          ++unknownDiskIdCount;
+        } else {
+          diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
+        }
+      }
+      long count = numUnknownDiskIds.getRef() + unknownDiskIdCount;
+      numUnknownDiskIds.setRef(Long.valueOf(count));
+      return diskIDs;
+    }
+
+    public static long getOffset(FbFileBlock fbFileBlock) { return fbFileBlock.offset(); }
+    public static long getLength(FbFileBlock fbFileBlock) { return fbFileBlock.length(); }
+    // Returns true if there is at least one cached replica.
+    public static boolean hasCachedReplica(FbFileBlock fbFileBlock) {
+      boolean hasCachedReplica = false;
+      for (int i = 0; i < fbFileBlock.replicaHostIdxsLength(); ++i) {
+        hasCachedReplica |= isReplicaCached(fbFileBlock, i);
+      }
+      return hasCachedReplica;
     }
 
-    public boolean isCached(int hostIndex) {
-      return fileBlock_.getIs_replica_cached().get(hostIndex);
+    public static int getNumReplicaHosts(FbFileBlock fbFileBlock) {
+      return fbFileBlock.replicaHostIdxsLength();
     }
 
-    public THdfsFileBlock toThrift() { return fileBlock_; }
+    public static int getReplicaHostIdx(FbFileBlock fbFileBlock, int pos) {
+      int idx = fbFileBlock.replicaHostIdxs(pos);
+      return idx & REPLICA_HOST_IDX_MASK;
+    }
 
-    public static FileBlock fromThrift(THdfsFileBlock thriftFileBlock) {
-      return new FileBlock(thriftFileBlock);
+    // Returns true if the block replica 'replicaIdx' is cached.
+    public static boolean isReplicaCached(FbFileBlock fbFileBlock, int replicaIdx) {
+      int idx = fbFileBlock.replicaHostIdxs(replicaIdx);
+      return (idx & ~REPLICA_HOST_IDX_MASK) != 0;
     }
 
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("offset", fileBlock_.offset)
-          .add("length", fileBlock_.length)
-          .add("#disks", fileBlock_.getDisk_idsSize())
+    /**
+     * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
+     * disk id is not supported.
+     */
+    public static int getDiskId(FbFileBlock fbFileBlock, int hostIndex) {
+      if (fbFileBlock.diskIdsLength() == 0) return -1;
+      return fbFileBlock.diskIds(hostIndex);
+    }
+
+    /**
+     * Returns a string representation of a FbFileBlock.
+     */
+    public static String debugString(FbFileBlock fbFileBlock) {
+      int numReplicaHosts = getNumReplicaHosts(fbFileBlock);
+      List<Integer> diskIds = Lists.newArrayListWithCapacity(numReplicaHosts);
+      List<Integer> replicaHosts = Lists.newArrayListWithCapacity(numReplicaHosts);
+      List<Boolean> isBlockCached = Lists.newArrayListWithCapacity(numReplicaHosts);
+      for (int i = 0; i < numReplicaHosts; ++i) {
+        diskIds.add(getDiskId(fbFileBlock, i));
+        replicaHosts.add(getReplicaHostIdx(fbFileBlock, i));
+        isBlockCached.add(isReplicaCached(fbFileBlock, i));
+      }
+      StringBuilder builder = new StringBuilder();
+      return builder.append("Offset: " + getOffset(fbFileBlock))
+          .append("Length: " + getLength(fbFileBlock))
+          .append("IsCached: " + hasCachedReplica(fbFileBlock))
+          .append("ReplicaHosts: " + Joiner.on(", ").join(replicaHosts))
+          .append("DiskIds: " + Joiner.on(", ").join(diskIds))
+          .append("Caching: " + Joiner.on(", ").join(isBlockCached))
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b6df167..e387d37 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -47,18 +47,18 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
-import org.apache.impala.catalog.HdfsPartition.BlockReplica;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.common.Reference;
+import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -83,7 +83,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -119,14 +118,6 @@ public class HdfsTable extends Table {
   public static final String TBL_PROP_PARQUET_MR_WRITE_ZONE =
       "parquet.mr.int96.write.zone";
 
-  // An invalid network address, which will always be treated as remote.
-  private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
-      new TNetworkAddress("remote*addr", 0);
-
-  // Minimum block size in bytes allowed for synthetic file blocks (other than the last
-  // block, which may be shorter).
-  private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
-
   // string to indicate NULL. set in load() from table properties
   private String nullColumnValue_;
 
@@ -144,14 +135,14 @@ public class HdfsTable extends Table {
   private boolean isMarkedCached_ = false;
 
   // Array of sorted maps storing the association between partition values and
-  // partition ids. There is one sorted map per partition key.
-  // TODO: We should not populate this for HdfsTable objects stored in the catalog
-  // server.
+  // partition ids. There is one sorted map per partition key. It is only populated if
+  // this table object is stored in ImpaladCatalog.
   private ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
       Lists.newArrayList();
 
   // Array of partition id sets that correspond to partitions with null values
-  // in the partition keys; one set per partition key.
+  // in the partition keys; one set per partition key. It is not populated if the table is
+  // stored in the catalog server.
   private ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
 
   // Map of partition ids to HdfsPartitions.
@@ -293,7 +284,7 @@ public class HdfsTable extends Table {
         return;
       }
 
-      int unknownDiskIdCount = 0;
+      Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
       RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true);
       while (fileStatusIter.hasNext()) {
         LocatedFileStatus fileStatus = fileStatusIter.next();
@@ -311,15 +302,9 @@ public class HdfsTable extends Table {
           }
           continue;
         }
-        String fileName = fileStatus.getPath().getName();
-        FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
-            fileStatus.getModificationTime());
-        BlockLocation[] locations = fileStatus.getBlockLocations();
-        unknownDiskIdCount += setFdBlockMetadata(fd, locations);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " +
-              fileName);
-        }
+
+        FileDescriptor fd = FileDescriptor.create(fileStatus,
+            fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
           partition.getFileDescriptors().add(fd);
@@ -327,6 +312,8 @@ public class HdfsTable extends Table {
           totalHdfsBytes_ += fd.getFileLength();
         }
       }
+
+      long unknownDiskIdCount = numUnknownDiskIds.getRef();
       if (unknownDiskIdCount > 0) {
         if (LOG.isWarnEnabled()) {
           LOG.warn("Unknown disk id count for filesystem " + fs + ":" +
@@ -340,69 +327,6 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Sets the block metadata for FileDescriptor 'fd' using block location metadata
-   * from 'locations'.
-   */
-  private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations)
-      throws IOException {
-    int unknownFdDiskIds = 0;
-    for (BlockLocation loc: locations) {
-      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
-      // Enumerate all replicas of the block, adding any unknown hosts
-      // to hostIndex_. We pick the network address from getNames() and
-      // map it to the corresponding hostname from getHosts().
-      List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
-          loc.getNames().length);
-      for (int i = 0; i < loc.getNames().length; ++i) {
-        TNetworkAddress networkAddress =
-            BlockReplica.parseLocation(loc.getNames()[i]);
-        replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
-            cachedHosts.contains(loc.getHosts()[i])));
-      }
-      FileBlock currentBlock =
-          new FileBlock(loc.getOffset(), loc.getLength(), replicas);
-      THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
-      fd.addThriftFileBlock(tHdfsFileBlock);
-      unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock);
-    }
-    return unknownFdDiskIds;
-  }
-
-  /**
-   * Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
-   * HDFS API for BlockLocation returns a storageID UUID string for each disk
-   * hosting the block, which is then mapped to a 0-based integer id called disk ID.
-   * Returns the number of unknown disk IDs encountered in this process.
-   */
-  private int loadDiskIds(BlockLocation location, THdfsFileBlock fileBlock) {
-    int unknownDiskIdCount = 0;
-    String[] storageIds = location.getStorageIds();
-    String[] hosts;
-    try {
-      hosts = location.getHosts();
-    } catch (IOException e) {
-      LOG.error("Couldn't get hosts for block: " + location.toString(), e);
-      return unknownDiskIdCount;
-    }
-    if (storageIds.length != hosts.length) {
-      LOG.error("Number of storage IDs and number of hosts for block: " + location
-          .toString() + " mismatch. Skipping disk ID loading for this block.");
-      return unknownDiskIdCount;
-    }
-    int[] diskIDs = new int[storageIds.length];
-    for (int i = 0; i < storageIds.length; ++i) {
-      if (Strings.isNullOrEmpty(storageIds[i])) {
-        diskIDs[i] = -1;
-        ++unknownDiskIdCount;
-      } else {
-        diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
-      }
-    }
-    FileBlock.setDiskIds(diskIDs, fileBlock);
-    return unknownDiskIdCount;
-  }
-
-  /**
    * Synthesize the block metadata for a given HdfsPartition object. Should only
    * be called for FileSystems that do not support storage IDs.
    */
@@ -439,14 +363,12 @@ public class HdfsTable extends Table {
         }
         continue;
       }
-      String fileName = fileStatus.getPath().getName();
-      FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
-          fileStatus.getModificationTime());
+
       Preconditions.checkState(partitions.size() > 0);
       // For the purpose of synthesizing block metadata, we assume that all partitions
       // with the same location have the same file format.
-      HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
-      synthesizeFdBlockMetadata(fs, fd, fileFormat);
+      FileDescriptor fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
+          partitions.get(0).getFileFormat(), hostIndex_);
       // Update the partitions' metadata that this file belongs to.
       for (HdfsPartition partition: partitions) {
         partition.getFileDescriptors().add(fd);
@@ -456,31 +378,6 @@ public class HdfsTable extends Table {
     }
   }
 
-  /**
-   * Helper method to synthesize block metadata for file descriptor fd.
-   */
-  private void synthesizeFdBlockMetadata(FileSystem fs, FileDescriptor fd,
-      HdfsFileFormat fileFormat) {
-    long start = 0;
-    long remaining = fd.getFileLength();
-    // Workaround HADOOP-11584 by using the filesystem default block size rather than
-    // the block size from the FileStatus.
-    // TODO: after HADOOP-11584 is resolved, get the block size from the FileStatus.
-    long blockSize = fs.getDefaultBlockSize();
-    if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
-    if (!fileFormat.isSplittable(HdfsCompression.fromFileName(fd.getFileName()))) {
-      blockSize = remaining;
-    }
-    while (remaining > 0) {
-      long len = Math.min(remaining, blockSize);
-      List<BlockReplica> replicas = Lists.newArrayList(
-          new BlockReplica(hostIndex_.getIndex(REMOTE_NETWORK_ADDRESS), false));
-      fd.addFileBlock(new FileBlock(start, len, replicas));
-      remaining -= len;
-      start += len;
-    }
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
@@ -662,12 +559,14 @@ public class HdfsTable extends Table {
     nameToPartitionMap_.clear();
     partitionValuesMap_.clear();
     nullPartitionIds_.clear();
-    // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
-    for (int i = 0; i < numClusteringCols_; ++i) {
-      getColumns().get(i).getStats().setNumNulls(0);
-      getColumns().get(i).getStats().setNumDistinctValues(0);
-      partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
-      nullPartitionIds_.add(Sets.<Long>newHashSet());
+    if (isStoredInImpaladCatalogCache()) {
+      // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
+      for (int i = 0; i < numClusteringCols_; ++i) {
+        getColumns().get(i).getStats().setNumNulls(0);
+        getColumns().get(i).getStats().setNumDistinctValues(0);
+        partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
+        nullPartitionIds_.add(Sets.<Long>newHashSet());
+      }
     }
     numHdfsFiles_ = 0;
     totalHdfsBytes_ = 0;
@@ -821,10 +720,10 @@ public class HdfsTable extends Table {
         if (fd == null || partition.isMarkedCached() ||
             fd.getFileLength() != fileStatus.getLen() ||
             fd.getModificationTime() != fileStatus.getModificationTime()) {
-          fd = new FileDescriptor(fileName, fileStatus.getLen(),
-              fileStatus.getModificationTime());
-          setFdBlockMetadata(fd,
-              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()));
+          BlockLocation[] locations =
+              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+          fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+              new Reference<Long>(Long.valueOf(0)));
         }
         newFileDescs.add(fd);
         newPartSizeBytes += fileStatus.getLen();
@@ -996,6 +895,8 @@ public class HdfsTable extends Table {
   private void updatePartitionMdAndColStats(HdfsPartition partition) {
     if (partition.getPartitionValues().size() != numClusteringCols_) return;
     partitionIds_.add(partition.getId());
+    nameToPartitionMap_.put(partition.getPartitionName(), partition);
+    if (!isStoredInImpaladCatalogCache()) return;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -1016,7 +917,6 @@ public class HdfsTable extends Table {
       }
       partitionIds.add(partition.getId());
     }
-    nameToPartitionMap_.put(partition.getPartitionName(), partition);
   }
 
   /**
@@ -1046,6 +946,7 @@ public class HdfsTable extends Table {
     partitionIds_.remove(partitionId);
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
+    if (!isStoredInImpaladCatalogCache()) return partition;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -1341,7 +1242,6 @@ public class HdfsTable extends Table {
     String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     if (msTbl == null) return false;
-    String inputFormat = msTbl.getSd().getInputFormat();
     return msTbl.getParameters().containsKey(key);
   }
 
@@ -1555,8 +1455,6 @@ public class HdfsTable extends Table {
     }
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
-    HdfsStorageDescriptor fileFormatDescriptor =
-        HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd());
     for (HdfsPartition partition: partitions) {
       org.apache.hadoop.hive.metastore.api.Partition msPart =
           partition.toHmsPartition();
@@ -1585,8 +1483,6 @@ public class HdfsTable extends Table {
       HdfsPartition partition) throws Exception {
     Preconditions.checkNotNull(storageDescriptor);
     Preconditions.checkNotNull(partition);
-    org.apache.hadoop.hive.metastore.api.Partition msPart =
-        partition.toHmsPartition();
     Path partDirPath = new Path(storageDescriptor.getLocation());
     FileSystem fs = partDirPath.getFileSystem(CONF);
     if (!fs.exists(partDirPath)) return;
@@ -1622,7 +1518,6 @@ public class HdfsTable extends Table {
     Preconditions.checkState(hdfsTable.getNetwork_addresses() instanceof ArrayList<?>);
     hostIndex_.populate((ArrayList<TNetworkAddress>)hdfsTable.getNetwork_addresses());
     resetPartitions();
-
     try {
       for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
         HdfsPartition hdfsPart =
@@ -1923,9 +1818,11 @@ public class HdfsTable extends Table {
         // Calculate the number the number of bytes that are cached.
         long cachedBytes = 0L;
         for (FileDescriptor fd: p.getFileDescriptors()) {
-          for (THdfsFileBlock fb: fd.getFileBlocks()) {
-            if (fb.getIs_replica_cached().contains(true)) {
-              cachedBytes += fb.getLength();
+          int numBlocks = fd.getNumFileBlocks();
+          for (int i = 0; i < numBlocks; ++i) {
+            FbFileBlock block = fd.getFbFileBlock(i);
+            if (FileBlock.hasCachedReplica(block)) {
+              cachedBytes += FileBlock.getLength(block);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index f70186e..881841c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -18,7 +18,6 @@
 package org.apache.impala.catalog;
 
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -59,6 +59,7 @@ import com.google.common.collect.Maps;
 public abstract class Table implements CatalogObject {
   private static final Logger LOG = Logger.getLogger(Table.class);
 
+  // Catalog version assigned to this table
   private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
   protected org.apache.hadoop.hive.metastore.api.Table msTable_;
 
@@ -88,6 +89,9 @@ public abstract class Table implements CatalogObject {
   // The lastDdlTime for this table; -1 if not set
   protected long lastDdlTime_;
 
+  // True if this object is stored in an Impalad catalog cache.
+  protected boolean storedInImpaladCatalogCache_ = false;
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -103,6 +107,13 @@ public abstract class Table implements CatalogObject {
       int tableId, Set<Long> referencedPartitions);
   public abstract TCatalogObjectType getCatalogObjectType();
 
+  // Returns true if this table reference comes from the impalad catalog cache or if it
+  // is loaded from the testing framework. Returns false if this table reference points
+  // to a table stored in the catalog server.
+  public boolean isStoredInImpaladCatalogCache() {
+    return storedInImpaladCatalogCache_ || RuntimeEnv.INSTANCE.isTestEnv();
+  }
+
   /**
    * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
    * valid existing metadata.
@@ -276,6 +287,8 @@ public abstract class Table implements CatalogObject {
     // Default to READ_WRITE access if the field is not set.
     accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() :
         TAccessLevel.READ_WRITE;
+
+    storedInImpaladCatalogCache_ = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index bd260e0..40d750b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -54,10 +54,10 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsFileSplit;
 import org.apache.impala.thrift.THdfsScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -540,47 +540,47 @@ public class HdfsScanNode extends ScanNode {
       boolean partitionMissingDiskIds = false;
       for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
         boolean fileDescMissingDiskIds = false;
-        for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
-          HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
-          List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
-          if (replicaHostIdxs.size() == 0) {
+        for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) {
+          FbFileBlock block = fileDesc.getFbFileBlock(j);
+          int replicaHostCount = FileBlock.getNumReplicaHosts(block);
+          if (replicaHostCount == 0) {
             // we didn't get locations for this block; for now, just ignore the block
             // TODO: do something meaningful with that
             continue;
           }
           // Collect the network address and volume ID of all replicas of this block.
           List<TScanRangeLocation> locations = Lists.newArrayList();
-          for (int i = 0; i < replicaHostIdxs.size(); ++i) {
+          for (int i = 0; i < replicaHostCount; ++i) {
             TScanRangeLocation location = new TScanRangeLocation();
             // Translate from the host index (local to the HdfsTable) to network address.
-            Integer tableHostIdx = replicaHostIdxs.get(i);
+            int replicaHostIdx = FileBlock.getReplicaHostIdx(block, i);
             TNetworkAddress networkAddress =
-                partition.getTable().getHostIndex().getEntry(tableHostIdx);
+                partition.getTable().getHostIndex().getEntry(replicaHostIdx);
             Preconditions.checkNotNull(networkAddress);
             // Translate from network address to the global (to this request) host index.
             Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
             location.setHost_idx(globalHostIdx);
-            if (checkMissingDiskIds && block.getDiskId(i) == -1) {
+            if (checkMissingDiskIds && FileBlock.getDiskId(block, i) == -1) {
               ++numScanRangesNoDiskIds_;
               partitionMissingDiskIds = true;
               fileDescMissingDiskIds = true;
             }
-            location.setVolume_id(block.getDiskId(i));
-            location.setIs_cached(block.isCached(i));
+            location.setVolume_id(FileBlock.getDiskId(block, i));
+            location.setIs_cached(FileBlock.isReplicaCached(block, i));
             locations.add(location);
           }
           // create scan ranges, taking into account maxScanRangeLength
-          long currentOffset = block.getOffset();
-          long remainingLength = block.getLength();
+          long currentOffset = FileBlock.getOffset(block);
+          long remainingLength = FileBlock.getLength(block);
           while (remainingLength > 0) {
             long currentLength = remainingLength;
             if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
               currentLength = maxScanRangeLength;
             }
             TScanRange scanRange = new TScanRange();
-            scanRange.setHdfs_file_split(new THdfsFileSplit(
-                fileDesc.getFileName(), currentOffset, currentLength, partition.getId(),
-                fileDesc.getFileLength(), fileDesc.getFileCompression(),
+            scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
+                currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
+                fileDesc.getFileCompression().toThrift(),
                 fileDesc.getModificationTime()));
             TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
             scanRangeLocations.scan_range = scanRange;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 7d80ce0..67005be 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -99,7 +99,7 @@ public class CatalogObjectToFromThriftTest {
         } else {
           Assert.assertEquals(hdfsPart.getFileDescriptors().size(), 1);
           Assert.assertTrue(
-              hdfsPart.getFileDescriptors().get(0).getFileBlocks().size() > 0);
+              hdfsPart.getFileDescriptors().get(0).getNumFileBlocks() > 0);
 
           // Verify the partition access level is getting set properly. The alltypes_seq
           // table has two partitions that are read_only.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 297666f..6202991 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -56,6 +56,8 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -75,6 +77,16 @@ public class FrontendTestBase {
   protected final List<Db> testDbs_ = Lists.newArrayList();
   protected final List<Table> testTables_ = Lists.newArrayList();
 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(false);
+  }
+
   protected Analyzer createAnalyzer(String defaultDb) {
     TQueryCtx queryCtx =
         TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));