You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/05/10 20:42:16 UTC

[1/4] incubator-impala git commit: IMPALA-5108: [DOCS] Explain 50% margin for idle_* settings

Repository: incubator-impala
Updated Branches:
  refs/heads/master 927034682 -> d6b5f82e3


IMPALA-5108: [DOCS] Explain 50% margin for idle_* settings

Add a note that idle_session_timeout and idle_query_timeout
might take up to 50% longer than the specified interval
before cancellation occurs.

Also rearrange the text a little to put QUERY_TIMEOUT_S
explanation adjacent to the idle_query_timeout explanation.
This results in an indentation change that makes the diff
look bigger than it really is. The <note> element is the
only actual added content.

Change-Id: I0b19912dac1df13bfcbcb67f0bd4ed0064ad8db9
Reviewed-on: http://gerrit.cloudera.org:8080/6463
Tested-by: Impala Public Jenkins
Reviewed-by: Michael Brown <mi...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9efed591
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9efed591
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9efed591

Branch: refs/heads/master
Commit: 9efed59163fd7c970614cb31d72e89cf0954a32f
Parents: 9270346
Author: John Russell <jr...@cloudera.com>
Authored: Thu Mar 23 10:36:45 2017 -0700
Committer: Michael Brown <mi...@cloudera.com>
Committed: Tue May 9 22:08:16 2017 +0000

----------------------------------------------------------------------
 docs/topics/impala_timeouts.xml | 52 +++++++++++++++++++++++-------------
 1 file changed, 33 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9efed591/docs/topics/impala_timeouts.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_timeouts.xml b/docs/topics/impala_timeouts.xml
index 5b843d4..80e5c9b 100644
--- a/docs/topics/impala_timeouts.xml
+++ b/docs/topics/impala_timeouts.xml
@@ -101,22 +101,34 @@ Trying to re-register with state-store</codeblock>
 
       <ul>
         <li>
-          The <codeph>--idle_query_timeout</codeph> option specifies the time in seconds after
-          which an idle query is cancelled. This could be a query whose results were all fetched
-          but was never closed, or one whose results were partially fetched and then the client
-          program stopped requesting further results. This condition is most likely to occur in
-          a client program using the JDBC or ODBC interfaces, rather than in the interactive
-          <cmdname>impala-shell</cmdname> interpreter. Once the query is cancelled, the client
-          program cannot retrieve any further results.
+          <p>
+            The <codeph>--idle_query_timeout</codeph> option specifies the time in seconds after
+            which an idle query is cancelled. This could be a query whose results were all fetched
+            but was never closed, or one whose results were partially fetched and then the client
+            program stopped requesting further results. This condition is most likely to occur in
+            a client program using the JDBC or ODBC interfaces, rather than in the interactive
+            <cmdname>impala-shell</cmdname> interpreter. Once the query is cancelled, the client
+            program cannot retrieve any further results.
+          </p>
+
+          <p rev="2.0.0">
+            You can reduce the idle query timeout by using the <codeph>QUERY_TIMEOUT_S</codeph>
+            query option. Any non-zero value specified for the <codeph>--idle_query_timeout</codeph> startup
+            option serves as an upper limit for the <codeph>QUERY_TIMEOUT_S</codeph> query option.
+            A zero value for <codeph>--idle_query_timeout</codeph> disables query timeouts.
+            See <xref href="impala_query_timeout_s.xml#query_timeout_s"/> for details.
+          </p>
         </li>
 
         <li>
-          The <codeph>--idle_session_timeout</codeph> option specifies the time in seconds after
-          which an idle session is expired. A session is idle when no activity is occurring for
-          any of the queries in that session, and the session has not started any new queries.
-          Once a session is expired, you cannot issue any new query requests to it. The session
-          remains open, but the only operation you can perform is to close it. The default value
-          of 0 means that sessions never expire.
+          <p>
+            The <codeph>--idle_session_timeout</codeph> option specifies the time in seconds after
+            which an idle session is expired. A session is idle when no activity is occurring for
+            any of the queries in that session, and the session has not started any new queries.
+            Once a session is expired, you cannot issue any new query requests to it. The session
+            remains open, but the only operation you can perform is to close it. The default value
+            of 0 means that sessions never expire.
+          </p>
         </li>
       </ul>
 
@@ -125,12 +137,14 @@ Trying to re-register with state-store</codeblock>
         <xref href="impala_config_options.xml#config_options"/>.
       </p>
 
-      <p rev="2.0.0">
-        You can reduce the idle query timeout by using the <codeph>QUERY_TIMEOUT_S</codeph>
-        query option. Any value specified for the <codeph>--idle_query_timeout</codeph> startup
-        option serves as an upper limit for the <codeph>QUERY_TIMEOUT_S</codeph> query option.
-        See <xref href="impala_query_timeout_s.xml#query_timeout_s"/> for details.
-      </p>
+      <note>
+        <p rev="IMPALA-5108">
+          Impala checks periodically for idle sessions and queries
+          to cancel. The actual idle time before cancellation might be up to 50% greater than
+          the specified configuration setting. For example, if the timeout setting was 60, the
+          session or query might be cancelled after being idle between 60 and 90 seconds.
+        </p>
+      </note>
 
     </conbody>
 


[2/4] incubator-impala git commit: IMPALA-5258: Pass CMAKE_BUILD_TYPE to Impala-lzo

Posted by ta...@apache.org.
IMPALA-5258: Pass CMAKE_BUILD_TYPE to Impala-lzo

Impala-lzo contained a buffer overrun that was only
present when compiled in optimized mode. To avoid this
(IMPALA-5172), Impala-lzo is only compiled in DEBUG mode.
Since the underlying buffer overrun has been resolved,
reenable passing the CMAKE_BUILD_TYPE down to Impala-lzo.

Change-Id: I190d732a8244dd79bc422cc0c563fb09ba2ed561
Reviewed-on: http://gerrit.cloudera.org:8080/6751
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ff5b0469
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ff5b0469
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ff5b0469

Branch: refs/heads/master
Commit: ff5b0469b1a718d8e84976c9946752cfce0e357f
Parents: 9efed59
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Apr 27 14:20:07 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 9 22:18:11 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ff5b0469/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3fa8456..5e568aa 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -387,7 +387,7 @@ add_custom_target(cscope ALL
 
 if (DEFINED ENV{IMPALA_LZO} AND EXISTS $ENV{IMPALA_LZO})
   add_custom_target(impala-lzo ALL DEPENDS thrift-deps
-    COMMAND $ENV{IMPALA_LZO}/build.sh DEBUG ${CMAKE_SOURCE_DIR}
+    COMMAND $ENV{IMPALA_LZO}/build.sh ${CMAKE_BUILD_TYPE} ${CMAKE_SOURCE_DIR}
     $ENV{IMPALA_TOOLCHAIN}
   )
 endif()


[4/4] incubator-impala git commit: IMPALA-4029: Reduce memory requirements for storing file metadata

Posted by ta...@apache.org.
IMPALA-4029: Reduce memory requirements for storing file metadata

This commit improves the memory requirements for storing file and block
metadata in the catalog and the impalad nodes by using the FlatBuffers
serialization library.

Testing:
Passed an exhaustive tests run.

Benchmark:
Memory requirement for storing an HDFS table with 250K files is reduced
by 2.5X.

Change-Id: I483d3cadc9d459f71a310c35a130d073597b0983
Reviewed-on: http://gerrit.cloudera.org:8080/6406
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d6b5f82e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d6b5f82e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d6b5f82e

Branch: refs/heads/master
Commit: d6b5f82e31732c355af3f3d1a8e5da94ba9c1349
Parents: eb7ee67
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Mar 2 16:51:42 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 10 09:23:05 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 common/fbs/CMakeLists.txt                       |  75 ++++
 common/fbs/CatalogObjects.fbs                   |  75 ++++
 common/thrift/CatalogObjects.thrift             |  38 +-
 fe/CMakeLists.txt                               |   2 +-
 fe/pom.xml                                      |   6 +
 .../org/apache/impala/catalog/DiskIdMapper.java |  44 ++-
 .../apache/impala/catalog/HdfsCompression.java  |  14 +
 .../apache/impala/catalog/HdfsPartition.java    | 384 ++++++++++++++-----
 .../org/apache/impala/catalog/HdfsTable.java    | 173 ++-------
 .../java/org/apache/impala/catalog/Table.java   |  15 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  32 +-
 .../catalog/CatalogObjectToFromThriftTest.java  |   2 +-
 .../apache/impala/common/FrontendTestBase.java  |  12 +
 14 files changed, 561 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5e568aa..130eb5b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -371,6 +371,7 @@ message(STATUS "Breakpad library: " ${BREAKPAD_STATIC_LIB})
 # compile these subdirs using their own CMakeLists.txt
 add_subdirectory(common/function-registry)
 add_subdirectory(common/thrift)
+add_subdirectory(common/fbs)
 add_subdirectory(be)
 add_subdirectory(fe)
 add_subdirectory(ext-data-source)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/fbs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/common/fbs/CMakeLists.txt b/common/fbs/CMakeLists.txt
new file mode 100644
index 0000000..d0c4ef7
--- /dev/null
+++ b/common/fbs/CMakeLists.txt
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.6)
+
+# Helper function to generate build rules. For each input flatbuffer file (*.fbs), this
+# function will generate a rule that maps the input file to an output c++ file.
+# The flatbuffers compiler will generate multiple output files for each input file. In
+# particular, it will generate a '_generated.h' for c++ and one .java file per flatbuffer
+# table definition.
+#
+# To call this function, pass it the output file list followed by the input flatbuffers
+# files: e.g. FB_GEN(OUTPUT_FILES, ${FLATBUFFER_FILES})
+#
+function(FB_GEN VAR)
+  IF (NOT ARGN)
+    MESSAGE(SEND_ERROR "Error: FB_GEN called without any src files")
+    RETURN()
+  ENDIF(NOT ARGN)
+
+  set(${VAR})
+  foreach(FIL ${ARGN})
+    # Get full path
+    get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
+    # Get basename
+    get_filename_component(FIL_WE ${FIL} NAME_WE)
+
+    set(OUTPUT_BE_FILE "${BE_OUTPUT_DIR}/${FIL_WE}_generated.h")
+    list(APPEND ${VAR} ${OUTPUT_BE_FILE})
+
+    add_custom_command(
+      OUTPUT ${OUTPUT_BE_FILE}
+      COMMAND ${FLATBUFFERS_COMPILER} ${CPP_ARGS} ${FIL}
+      COMMAND ${FLATBUFFERS_COMPILER} ${JAVA_FE_ARGS} ${FIL}
+      DEPENDS ${ABS_FIL}
+      COMMENT "Running FlatBuffers compiler on ${FIL}"
+      VERBATIM
+    )
+  endforeach(FIL)
+
+  set(${VAR} ${${VAR}} PARENT_SCOPE)
+endfunction(FB_GEN)
+
+message("Using FlatBuffers compiler: ${FLATBUFFERS_COMPILER}")
+set(BE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp)
+set(FE_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/fe/generated-sources/gen-java)
+file(MAKE_DIRECTORY ${FE_OUTPUT_DIR})
+file(MAKE_DIRECTORY ${BE_OUTPUT_DIR})
+set(JAVA_FE_ARGS --java -o ${FE_OUTPUT_DIR} -b)
+message(${JAVA_FE_ARGS})
+set(CPP_ARGS --cpp -o ${BE_OUTPUT_DIR} -b)
+message(${CPP_ARGS})
+
+# Add new FlatBuffer schema files here.
+set (SRC_FILES
+  CatalogObjects.fbs
+)
+
+FB_GEN(FB_ALL_FILES ${SRC_FILES})
+add_custom_target(fb-deps ALL DEPENDS ${FB_ALL_FILES})
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/fbs/CatalogObjects.fbs
----------------------------------------------------------------------
diff --git a/common/fbs/CatalogObjects.fbs b/common/fbs/CatalogObjects.fbs
new file mode 100644
index 0000000..bf44380
--- /dev/null
+++ b/common/fbs/CatalogObjects.fbs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace org.apache.impala.fb;
+
+// Supported compression algorithms. This needs to match the values in
+// CatalogObjects.THdfsCompression enum.
+enum FbCompression: byte {
+  NONE,
+  DEFAULT,
+  GZIP,
+  DEFLATE,
+  BZIP2,
+  SNAPPY,
+  SNAPPY_BLOCKED,
+  LZO,
+  LZ4,
+  ZLIB
+}
+
+table FbFileBlock {
+  // Offset of this block within the file
+  // TODO: Remove this field if file blocks are retrieved by offset. Infer offset using
+  // the block length.
+  offset: long = 0 (id: 0);
+
+  // Total length of the block.
+  // TODO: Remove this field and compute the block length using the offsets, block size,
+  // and file length.
+  length: long = -1 (id: 1);
+
+  // Hosts that contain replicas of this block. Each value in the list is an index to
+  // the network_addresses list of THdfsTable. The most significant bit of each
+  // replica host index indicates if the replica is cached.
+  replica_host_idxs: [ushort] (id: 2);
+
+  // The list of disk ids for the file block. May not be set if disk ids are not
+  // supported.
+  disk_ids: [ushort] (id: 3);
+}
+
+table FbFileDesc {
+  // The name of the file (not the full path). The parent path is assumed to be the
+  // 'location' of the Partition this file resides within.
+  // TODO: Investigate the use of prefix-based compression for file names.
+  file_name: string (id: 0);
+
+  // The total length of the file, in bytes.
+  length: long (id: 1);
+
+  // The type of compression used for this file.
+  // TODO: Check if reordering these fields can produce some space savings by eliminating
+  // added padding.
+  compression: FbCompression (id: 2);
+
+  // The last modified time of the file.
+  last_modification_time: long (id: 3);
+
+  // List of FbFileBlocks that make up this file.
+  file_blocks: [FbFileBlock] (id: 4);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index f7ce00b..eeeae44 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -212,42 +212,12 @@ struct TColumn {
   16: optional i32 block_size
 }
 
-// Represents a block in an HDFS file
-struct THdfsFileBlock {
-  // Offset of this block within the file
-  1: required i64 offset
-
-  // Total length of the block
-  2: required i64 length
-
-  // Hosts that contain replicas of this block. Each value in the list is an index in to
-  // the network_addresses list of THdfsTable.
-  3: required list<i32> replica_host_idxs
-
-  // The list of disk ids for the file block. May not be set if disk ids are not supported
-  4: optional list<i32> disk_ids
-
-  // For each replica, specifies if the block is cached in memory.
-  5: optional list<bool> is_replica_cached
-}
-
 // Represents an HDFS file in a partition.
 struct THdfsFileDesc {
-  // The name of the file (not the full path). The parent path is assumed to be the
-  // 'location' of the THdfsPartition this file resides within.
-  1: required string file_name
-
-  // The total length of the file, in bytes.
-  2: required i64 length
-
-  // The type of compression used for this file.
-  3: required THdfsCompression compression
-
-  // The last modified time of the file.
-  4: required i64 last_modification_time
-
-  // List of THdfsFileBlocks that make up this file.
-  5: required list<THdfsFileBlock> file_blocks
+  // File descriptor metadata serialized into a FlatBuffer
+  // (defined in common/fbs/CatalogObjects.fbs).
+  // TODO: Put this in a KRPC sidecar to avoid serialization cost.
+  1: required binary file_desc_data
 }
 
 // Represents an HDFS partition's location in a compressed format. 'prefix_index'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/fe/CMakeLists.txt b/fe/CMakeLists.txt
index e06f467..eee1601 100644
--- a/fe/CMakeLists.txt
+++ b/fe/CMakeLists.txt
@@ -15,6 +15,6 @@
 # specific language governing permissions and limitations
 # under the License.
 
-add_custom_target(fe ALL DEPENDS thrift-deps function-registry ext-data-source
+add_custom_target(fe ALL DEPENDS thrift-deps fb-deps function-registry ext-data-source
   COMMAND $ENV{IMPALA_HOME}/bin/mvn-quiet.sh install -DskipTests
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 3399544..c6aeaa3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -352,6 +352,12 @@ under the License.
       <version>1.0.2</version>
     </dependency>
 
+    <dependency>
+      <groupId>com.github.davidmoten</groupId>
+      <artifactId>flatbuffers-java</artifactId>
+      <version>1.6.0.1</version>
+    </dependency>
+
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
index 6fdcef0..87d7203 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DiskIdMapper.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
 import com.google.common.collect.Maps;
 import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
 
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,14 +42,14 @@ public class DiskIdMapper {
     private DiskIdMapper() {}
 
     // Maps each storage ID UUID string returned by the BlockLocation API, to a per-node
-    // sequential 0-based integer disk id used by the BE scanners. This assumes that
+    // sequential 0-based disk id used by the BE scanners. This assumes that
     // the storage ID of a particular disk is unique across all the nodes in the cluster.
-    private ConcurrentHashMap<String, Integer> storageUuidToDiskId =
-        new ConcurrentHashMap<String, Integer>();
+    private ConcurrentHashMap<String, Short> storageUuidToDiskId_ =
+        new ConcurrentHashMap<String, Short>();
 
-    // Per-host ID generator for storage UUID to integer ID mapping. This maps each host
-    // to the corresponding latest 0-based integer ID.
-    private HashMap<String, Integer> storageIdGenerator = Maps.newHashMap();
+    // Per-host ID generator for storage UUID to Short ID mapping. This maps each host
+    // to the corresponding latest 0-based ID stored in a short.
+    private HashMap<String, Short> storageIdGenerator_ = Maps.newHashMap();
 
     /**
      * Returns a disk id (0-based) index for storageUuid on host 'host'. Generates a
@@ -58,30 +59,35 @@ public class DiskIdMapper {
      * TODO: It is quite possible that there will be lock contention in this method during
      * the initial metadata load. Figure out ways to fix it using finer locking scheme.
      */
-    public int getDiskId(String host, String storageUuid) {
+    public short getDiskId(String host, String storageUuid) {
       Preconditions.checkState(!Strings.isNullOrEmpty(host));
       // Initialize the diskId as -1 to indicate it is unknown
-      int diskId = -1;
+      short diskId = -1;
       // Check if an existing mapping is already present. This is intentionally kept
       // out of the synchronized block to avoid contention for lookups. Once a reasonable
-      // amount of data loading is done and storageIdtoInt is populated with storage IDs
-      // across the cluster, we expect to have a good hit rate.
-      Integer intId = storageUuidToDiskId.get(storageUuid);
-      if (intId != null) return intId;
-      synchronized (storageIdGenerator) {
+      // amount of data loading is done and storageUuidToDiskId_ is populated with storage
+      // IDs across the cluster, we expect to have a good hit rate.
+      Short shortId = storageUuidToDiskId_.get(storageUuid);
+      if (shortId != null) return shortId;
+      synchronized (storageIdGenerator_) {
         // Mapping might have been added by another thread that entered the synchronized
         // block first.
-        intId = storageUuidToDiskId.get(storageUuid);
-        if (intId != null) return intId;
+        shortId = storageUuidToDiskId_.get(storageUuid);
+        if (shortId != null) return shortId;
         // No mapping exists, create a new disk ID for 'storageUuid'
-        if (storageIdGenerator.containsKey(host)) {
-          diskId = storageIdGenerator.get(host) + 1;
+        if (storageIdGenerator_.containsKey(host)) {
+          try {
+            diskId = Shorts.checkedCast(storageIdGenerator_.get(host) + 1);
+          } catch (IllegalStateException e) {
+            Preconditions.checkState(false,
+                "Number of hosts exceeded " + Short.MAX_VALUE);
+          }
         } else {
           // First diskId of this host.
           diskId = 0;
         }
-        storageIdGenerator.put(host, new Integer(diskId));
-        storageUuidToDiskId.put(storageUuid, new Integer(diskId));
+        storageIdGenerator_.put(host, Short.valueOf(diskId));
+        storageUuidToDiskId_.put(storageUuid, Short.valueOf(diskId));
       }
       return diskId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
index 36a1e9e..dd81587 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsCompression.java
@@ -17,7 +17,9 @@
 
 package org.apache.impala.catalog;
 
+import org.apache.impala.fb.FbCompression;
 import org.apache.impala.thrift.THdfsCompression;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 
@@ -73,6 +75,18 @@ public enum HdfsCompression {
     }
   }
 
+  public byte toFb() {
+    switch (this) {
+      case NONE: return FbCompression.NONE;
+      case DEFLATE: return FbCompression.DEFLATE;
+      case GZIP: return FbCompression.GZIP;
+      case BZIP2: return FbCompression.BZIP2;
+      case SNAPPY: return FbCompression.SNAPPY;
+      case LZO: return FbCompression.LZO;
+      default: throw new IllegalStateException("Unexpected codec: " + this);
+    }
+  }
+
   /* Returns a compression type based on (Hive's) intput format. Special case for LZO. */
   public static HdfsCompression fromHdfsInputFormatClass(String inputFormatClass) {
     // TODO: Remove when we have the native LZO writer.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index de1a948..76fddcf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -17,15 +17,15 @@
 
 package org.apache.impala.catalog;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,27 +37,40 @@ import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.Reference;
+import org.apache.impala.fb.FbCompression;
+import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.fb.FbFileDesc;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
-import org.apache.impala.thrift.THdfsCompression;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsFileDesc;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionStats;
 import org.apache.impala.thrift.TTableStats;
 import org.apache.impala.util.HdfsCachingUtil;
+import org.apache.impala.util.ListMap;
+
+import com.google.flatbuffers.FlatBufferBuilder;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Shorts;
 
 /**
  * Query-relevant information for one table partition. Partitions are comparable
@@ -68,67 +81,155 @@ import com.google.common.collect.Maps;
 public class HdfsPartition implements Comparable<HdfsPartition> {
   /**
    * Metadata for a single file in this partition.
-   * TODO: Do we even need this class? Just get rid of it and use the Thrift version?
    */
   static public class FileDescriptor implements Comparable<FileDescriptor> {
-    private final THdfsFileDesc fileDescriptor_;
+    // An invalid network address, which will always be treated as remote.
+    private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
+        new TNetworkAddress("remote*addr", 0);
 
-    public String getFileName() { return fileDescriptor_.getFile_name(); }
-    public long getFileLength() { return fileDescriptor_.getLength(); }
-    public THdfsCompression getFileCompression() {
-      return fileDescriptor_.getCompression();
-    }
-    public long getModificationTime() {
-      return fileDescriptor_.getLast_modification_time();
+    // Minimum block size in bytes allowed for synthetic file blocks (other than the last
+    // block, which may be shorter).
+    private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
+
+    // Internal representation of a file descriptor using a FlatBuffer.
+    private final FbFileDesc fbFileDescriptor_;
+
+    private FileDescriptor(FbFileDesc fileDescData) { fbFileDescriptor_ = fileDescData; }
+
+    public static FileDescriptor fromThrift(THdfsFileDesc desc) {
+      ByteBuffer bb = ByteBuffer.wrap(desc.getFile_desc_data());
+      return new FileDescriptor(FbFileDesc.getRootAsFbFileDesc(bb));
     }
-    public List<THdfsFileBlock> getFileBlocks() {
-      return fileDescriptor_.getFile_blocks();
+
+    /**
+     * Creates the file descriptor of a file represented by 'fileStatus' with blocks
+     * stored in 'blockLocations'. 'fileSystem' is the filesystem where the
+     * file resides and 'hostIndex' stores the network addresses of the hosts that store
+     * blocks of the parent HdfsTable. Populates 'numUnknownDiskIds' with the number of
+     * unknown disk ids.
+     */
+    public static FileDescriptor create(FileStatus fileStatus,
+        BlockLocation[] blockLocations, FileSystem fileSystem,
+        ListMap<TNetworkAddress> hostIndex, Reference<Long> numUnknownDiskIds)
+        throws IOException {
+      Preconditions.checkState(FileSystemUtil.supportsStorageIds(fileSystem));
+      FlatBufferBuilder fbb = new FlatBufferBuilder(1);
+      int[] fbFileBlockOffsets = new int[blockLocations.length];
+      int blockIdx = 0;
+      for (BlockLocation loc: blockLocations) {
+        fbFileBlockOffsets[blockIdx++] = FileBlock.createFbFileBlock(fbb, loc, hostIndex,
+            numUnknownDiskIds);
+      }
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, fbFileBlockOffsets));
     }
 
-    public THdfsFileDesc toThrift() { return fileDescriptor_; }
+    /**
+     * Creates the file descriptor of a file represented by 'fileStatus' that
+     * resides in a filesystem that doesn't support the BlockLocation API (e.g. S3).
+     * fileFormat' is the file format of the partition where this file resides and
+     * 'hostIndex' stores the network addresses of the hosts that store blocks of
+     * the parent HdfsTable.
+     */
+    public static FileDescriptor createWithSynthesizedBlockMd(FileStatus fileStatus,
+        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
+      FlatBufferBuilder fbb = new FlatBufferBuilder(1);
+      int[] fbFileBlockOffets =
+          synthesizeFbBlockMd(fbb, fileStatus, fileFormat, hostIndex);
+      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, fbFileBlockOffets));
+    }
 
-    public FileDescriptor(String fileName, long fileLength, long modificationTime) {
-      Preconditions.checkNotNull(fileName);
-      Preconditions.checkArgument(fileLength >= 0);
-      fileDescriptor_ = new THdfsFileDesc();
-      fileDescriptor_.setFile_name(fileName);
-      fileDescriptor_.setLength(fileLength);
-      fileDescriptor_.setLast_modification_time(modificationTime);
-      fileDescriptor_.setCompression(
-          HdfsCompression.fromFileName(fileName).toThrift());
-      List<THdfsFileBlock> emptyFileBlockList = Lists.newArrayList();
-      fileDescriptor_.setFile_blocks(emptyFileBlockList);
+    /**
+     * Serializes the metadata of a file descriptor represented by 'fileStatus' into a
+     * FlatBuffer using 'fbb' and returns the associated FbFileDesc object. 'blockOffsets'
+     * are the offsets of the serialized block metadata of this file in the underlying
+     * buffer.
+     */
+    private static FbFileDesc createFbFileDesc(FlatBufferBuilder fbb,
+        FileStatus fileStatus, int[] fbFileBlockOffets) {
+      int fileNameOffset = fbb.createString(fileStatus.getPath().getName());
+      int blockVectorOffset = FbFileDesc.createFileBlocksVector(fbb, fbFileBlockOffets);
+      FbFileDesc.startFbFileDesc(fbb);
+      FbFileDesc.addFileName(fbb, fileNameOffset);
+      FbFileDesc.addLength(fbb, fileStatus.getLen());
+      FbFileDesc.addLastModificationTime(fbb, fileStatus.getModificationTime());
+      HdfsCompression comp = HdfsCompression.fromFileName(fileStatus.getPath().getName());
+      FbFileDesc.addCompression(fbb, comp.toFb());
+      FbFileDesc.addFileBlocks(fbb, blockVectorOffset);
+      fbb.finish(FbFileDesc.endFbFileDesc(fbb));
+      // To eliminate memory fragmentation, copy the contents of the FlatBuffer to the
+      // smallest possible ByteBuffer.
+      ByteBuffer bb = fbb.dataBuffer().slice();
+      ByteBuffer compressedBb = ByteBuffer.allocate(bb.capacity());
+      compressedBb.put(bb);
+      return FbFileDesc.getRootAsFbFileDesc((ByteBuffer)compressedBb.flip());
     }
 
-    private FileDescriptor(THdfsFileDesc fileDesc) {
-      this(fileDesc.getFile_name(), fileDesc.length, fileDesc.last_modification_time);
-      for (THdfsFileBlock block: fileDesc.getFile_blocks()) {
-        fileDescriptor_.addToFile_blocks(block);
+    /**
+     * Synthesizes the block metadata of a file represented by 'fileStatus' that resides
+     * in a filesystem that doesn't support the BlockLocation API. The block metadata
+     * consist of the length and offset of each file block. It serializes the
+     * block metadata into a FlatBuffer using 'fbb' and returns their offsets in the
+     * underlying buffer. 'fileFormat' is the file format of the underlying partition and
+     * 'hostIndex' stores the network addresses of the hosts that store the blocks of the
+     * parent HdfsTable.
+     */
+    private static int[] synthesizeFbBlockMd(FlatBufferBuilder fbb, FileStatus fileStatus,
+        HdfsFileFormat fileFormat, ListMap<TNetworkAddress> hostIndex) {
+      long start = 0;
+      long remaining = fileStatus.getLen();
+      long blockSize = fileStatus.getBlockSize();
+      if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
+      if (!fileFormat.isSplittable(HdfsCompression.fromFileName(
+          fileStatus.getPath().getName()))) {
+        blockSize = remaining;
       }
+      List<Integer> fbFileBlockOffets = Lists.newArrayList();
+      while (remaining > 0) {
+        long len = Math.min(remaining, blockSize);
+        fbFileBlockOffets.add(FileBlock.createFbFileBlock(fbb, start, len,
+            (short) hostIndex.getIndex(REMOTE_NETWORK_ADDRESS)));
+        remaining -= len;
+        start += len;
+      }
+      return Ints.toArray(fbFileBlockOffets);
     }
 
-    public void addFileBlock(FileBlock blockMd) {
-      addThriftFileBlock(blockMd.toThrift());
+    public String getFileName() { return fbFileDescriptor_.fileName(); }
+    public long getFileLength() { return fbFileDescriptor_.length(); }
+
+    public HdfsCompression getFileCompression() {
+      return HdfsCompression.valueOf(FbCompression.name(fbFileDescriptor_.compression()));
     }
 
-    public void addThriftFileBlock(THdfsFileBlock block) {
-      fileDescriptor_.addToFile_blocks(block);
+    public long getModificationTime() { return fbFileDescriptor_.lastModificationTime(); }
+    public int getNumFileBlocks() { return fbFileDescriptor_.fileBlocksLength(); }
+
+    public FbFileBlock getFbFileBlock(int idx) {
+      return fbFileDescriptor_.fileBlocks(idx);
     }
 
-    public static FileDescriptor fromThrift(THdfsFileDesc desc) {
-      return new FileDescriptor(desc);
+    public THdfsFileDesc toThrift() {
+      THdfsFileDesc fd = new THdfsFileDesc();
+      ByteBuffer bb = fbFileDescriptor_.getByteBuffer();
+      fd.setFile_desc_data(bb);
+      return fd;
     }
 
     @Override
     public String toString() {
+      int numFileBlocks = getNumFileBlocks();
+      List<String> blocks = Lists.newArrayListWithCapacity(numFileBlocks);
+      for (int i = 0; i < numFileBlocks; ++i) {
+        blocks.add(FileBlock.debugString(getFbFileBlock(i)));
+      }
       return Objects.toStringHelper(this)
           .add("FileName", getFileName())
-          .add("Length", getFileLength()).toString();
+          .add("Length", getFileLength())
+          .add("Compression", getFileCompression())
+          .add("ModificationTime", getModificationTime())
+          .add("Blocks", Joiner.on(", ").join(blocks)).toString();
     }
 
-    /**
-     * Orders file descriptors lexicographically by file name.
-     */
     @Override
     public int compareTo(FileDescriptor otherFd) {
       return getFileName().compareTo(otherFd.getFileName());
@@ -140,14 +241,14 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
    */
   public static class BlockReplica {
     private final boolean isCached_;
-    private final int hostIdx_;
+    private final short hostIdx_;
 
     /**
      * Creates a BlockReplica given a host ID/index and a flag specifying whether this
      * replica is cahced. Host IDs are assigned when loading the block metadata in
      * HdfsTable.
      */
-    public BlockReplica(int hostIdx, boolean isCached) {
+    public BlockReplica(short hostIdx, boolean isCached) {
       hostIdx_ = hostIdx;
       isCached_ = isCached;
     }
@@ -168,88 +269,167 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     }
 
     public boolean isCached() { return isCached_; }
-    public int getHostIdx() { return hostIdx_; }
+    public short getHostIdx() { return hostIdx_; }
   }
 
   /**
-   * File Block metadata
+   * Static utility methods to serialize and access file block metadata from FlatBuffers.
    */
   public static class FileBlock {
-    private final THdfsFileBlock fileBlock_;
-    private boolean isCached_; // Set to true if there is at least one cached replica.
-
-    private FileBlock(THdfsFileBlock fileBlock) {
-      fileBlock_ = fileBlock;
-      isCached_ = false;
-      for (boolean isCached: fileBlock.getIs_replica_cached()) {
-        isCached_ |= isCached;
-      }
-    }
+    // Bit mask used to extract the replica host id and cache info of a file block.
+    // Use ~REPLICA_HOST_IDX_MASK to extract the cache info (stored in MSB).
+    private static short REPLICA_HOST_IDX_MASK = (1 << 15) - 1;
 
     /**
-     * Construct a FileBlock given the start offset (in bytes) of the file associated
-     * with this block, the length of the block (in bytes), and a list of BlockReplicas.
-     * Does not fill diskIds.
+     * Constructs an FbFileBlock object from the block location metadata
+     * 'loc'. Serializes the file block metadata into a FlatBuffer using 'fbb' and
+     * returns the offset in the underlying buffer where the encoded file block starts.
+     * 'hostIndex' stores the network addresses of the datanodes that store the files of
+     * the parent HdfsTable. Populates 'numUnknownDiskIds' with the number of unknown disk
+     * ids.
      */
-    public FileBlock(long offset, long blockLength,
-        List<BlockReplica> replicaHostIdxs) {
-      Preconditions.checkNotNull(replicaHostIdxs);
-      fileBlock_ = new THdfsFileBlock();
-      fileBlock_.setOffset(offset);
-      fileBlock_.setLength(blockLength);
-
-      fileBlock_.setReplica_host_idxs(new ArrayList<Integer>(replicaHostIdxs.size()));
-      fileBlock_.setIs_replica_cached(new ArrayList<Boolean>(replicaHostIdxs.size()));
-      isCached_ = false;
-      for (BlockReplica replica: replicaHostIdxs) {
-        fileBlock_.addToReplica_host_idxs(replica.getHostIdx());
-        fileBlock_.addToIs_replica_cached(replica.isCached());
-        isCached_ |= replica.isCached();
+    public static int createFbFileBlock(FlatBufferBuilder fbb, BlockLocation loc,
+        ListMap<TNetworkAddress> hostIndex, Reference<Long> numUnknownDiskIds)
+        throws IOException {
+      Preconditions.checkNotNull(fbb);
+      Preconditions.checkNotNull(loc);
+      Preconditions.checkNotNull(hostIndex);
+      // replica host ids
+      FbFileBlock.startReplicaHostIdxsVector(fbb, loc.getNames().length);
+      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
+      // Enumerate all replicas of the block, adding any unknown hosts
+      // to hostIndex. We pick the network address from getNames() and
+      // map it to the corresponding hostname from getHosts().
+      for (int i = 0; i < loc.getNames().length; ++i) {
+        TNetworkAddress networkAddress = BlockReplica.parseLocation(loc.getNames()[i]);
+        short replicaIdx = (short) hostIndex.getIndex(networkAddress);
+        boolean isReplicaCached = cachedHosts.contains(loc.getHosts()[i]);
+        replicaIdx = isReplicaCached ?
+            (short) (replicaIdx | ~REPLICA_HOST_IDX_MASK) : replicaIdx;
+        fbb.addShort(replicaIdx);
       }
-    }
+      int fbReplicaHostIdxOffset = fbb.endVector();
 
-    public long getOffset() { return fileBlock_.getOffset(); }
-    public long getLength() { return fileBlock_.getLength(); }
-    public List<Integer> getReplicaHostIdxs() {
-      return fileBlock_.getReplica_host_idxs();
+      // disk ids
+      short[] diskIds = createDiskIds(loc, numUnknownDiskIds);
+      Preconditions.checkState(diskIds.length != 0);
+      int fbDiskIdsOffset = FbFileBlock.createDiskIdsVector(fbb, diskIds);
+      FbFileBlock.startFbFileBlock(fbb);
+      FbFileBlock.addOffset(fbb, loc.getOffset());
+      FbFileBlock.addLength(fbb, loc.getLength());
+      FbFileBlock.addReplicaHostIdxs(fbb, fbReplicaHostIdxOffset);
+      FbFileBlock.addDiskIds(fbb, fbDiskIdsOffset);
+      return FbFileBlock.endFbFileBlock(fbb);
     }
 
     /**
-     * Populates the given THdfsFileBlock's list of disk ids with the given disk id
-     * values. The number of disk ids must match the number of network addresses
-     * set in the file block.
+     * Constructs an FbFileBlock object from the file block metadata that comprise block's
+     * 'offset', 'length' and replica index 'replicaIdx'. Serializes the file block
+     * metadata into a FlatBuffer using 'fbb' and returns the offset in the underlying
+     * buffer where the encoded file block starts.
      */
-    public static void setDiskIds(int[] diskIds, THdfsFileBlock fileBlock) {
-      Preconditions.checkArgument(
-          diskIds.length == fileBlock.getReplica_host_idxs().size());
-      fileBlock.setDisk_ids(Arrays.asList(ArrayUtils.toObject(diskIds)));
+    public static int createFbFileBlock(FlatBufferBuilder fbb, long offset, long length,
+        short replicaIdx) {
+      Preconditions.checkNotNull(fbb);
+      FbFileBlock.startReplicaHostIdxsVector(fbb, 1);
+      fbb.addShort(replicaIdx);
+      int fbReplicaHostIdxOffset = fbb.endVector();
+      FbFileBlock.startFbFileBlock(fbb);
+      FbFileBlock.addOffset(fbb, offset);
+      FbFileBlock.addLength(fbb, length);
+      FbFileBlock.addReplicaHostIdxs(fbb, fbReplicaHostIdxOffset);
+      return FbFileBlock.endFbFileBlock(fbb);
     }
 
     /**
-     * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
-     * disk id is not supported.
+     * Creates the disk ids of a block from its BlockLocation 'location'. Returns the
+     * disk ids and populates 'numUnknownDiskIds' with the number of unknown disk ids.
      */
-    public int getDiskId(int hostIndex) {
-      if (fileBlock_.disk_ids == null) return -1;
-      return fileBlock_.getDisk_ids().get(hostIndex);
+    private static short[] createDiskIds(BlockLocation location,
+        Reference<Long> numUnknownDiskIds) {
+      long unknownDiskIdCount = 0;
+      String[] storageIds = location.getStorageIds();
+      String[] hosts;
+      try {
+        hosts = location.getHosts();
+      } catch (IOException e) {
+        LOG.error("Couldn't get hosts for block: " + location.toString(), e);
+        return new short[0];
+      }
+      if (storageIds.length != hosts.length) {
+        LOG.error("Number of storage IDs and number of hosts for block: " + location
+            .toString() + " mismatch. Skipping disk ID loading for this block.");
+        return Shorts.toArray(Collections.<Short>emptyList());
+      }
+      short[] diskIDs = new short[storageIds.length];
+      for (int i = 0; i < storageIds.length; ++i) {
+        if (Strings.isNullOrEmpty(storageIds[i])) {
+          diskIDs[i] = (short) -1;
+          ++unknownDiskIdCount;
+        } else {
+          diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
+        }
+      }
+      long count = numUnknownDiskIds.getRef() + unknownDiskIdCount;
+      numUnknownDiskIds.setRef(Long.valueOf(count));
+      return diskIDs;
+    }
+
+    public static long getOffset(FbFileBlock fbFileBlock) { return fbFileBlock.offset(); }
+    public static long getLength(FbFileBlock fbFileBlock) { return fbFileBlock.length(); }
+    // Returns true if there is at least one cached replica.
+    public static boolean hasCachedReplica(FbFileBlock fbFileBlock) {
+      boolean hasCachedReplica = false;
+      for (int i = 0; i < fbFileBlock.replicaHostIdxsLength(); ++i) {
+        hasCachedReplica |= isReplicaCached(fbFileBlock, i);
+      }
+      return hasCachedReplica;
     }
 
-    public boolean isCached(int hostIndex) {
-      return fileBlock_.getIs_replica_cached().get(hostIndex);
+    public static int getNumReplicaHosts(FbFileBlock fbFileBlock) {
+      return fbFileBlock.replicaHostIdxsLength();
     }
 
-    public THdfsFileBlock toThrift() { return fileBlock_; }
+    public static int getReplicaHostIdx(FbFileBlock fbFileBlock, int pos) {
+      int idx = fbFileBlock.replicaHostIdxs(pos);
+      return idx & REPLICA_HOST_IDX_MASK;
+    }
 
-    public static FileBlock fromThrift(THdfsFileBlock thriftFileBlock) {
-      return new FileBlock(thriftFileBlock);
+    // Returns true if the block replica 'replicaIdx' is cached.
+    public static boolean isReplicaCached(FbFileBlock fbFileBlock, int replicaIdx) {
+      int idx = fbFileBlock.replicaHostIdxs(replicaIdx);
+      return (idx & ~REPLICA_HOST_IDX_MASK) != 0;
     }
 
-    @Override
-    public String toString() {
-      return Objects.toStringHelper(this)
-          .add("offset", fileBlock_.offset)
-          .add("length", fileBlock_.length)
-          .add("#disks", fileBlock_.getDisk_idsSize())
+    /**
+     * Return the disk id of the block in BlockLocation.getNames()[hostIndex]; -1 if
+     * disk id is not supported.
+     */
+    public static int getDiskId(FbFileBlock fbFileBlock, int hostIndex) {
+      if (fbFileBlock.diskIdsLength() == 0) return -1;
+      return fbFileBlock.diskIds(hostIndex);
+    }
+
+    /**
+     * Returns a string representation of a FbFileBlock.
+     */
+    public static String debugString(FbFileBlock fbFileBlock) {
+      int numReplicaHosts = getNumReplicaHosts(fbFileBlock);
+      List<Integer> diskIds = Lists.newArrayListWithCapacity(numReplicaHosts);
+      List<Integer> replicaHosts = Lists.newArrayListWithCapacity(numReplicaHosts);
+      List<Boolean> isBlockCached = Lists.newArrayListWithCapacity(numReplicaHosts);
+      for (int i = 0; i < numReplicaHosts; ++i) {
+        diskIds.add(getDiskId(fbFileBlock, i));
+        replicaHosts.add(getReplicaHostIdx(fbFileBlock, i));
+        isBlockCached.add(isReplicaCached(fbFileBlock, i));
+      }
+      StringBuilder builder = new StringBuilder();
+      return builder.append("Offset: " + getOffset(fbFileBlock))
+          .append("Length: " + getLength(fbFileBlock))
+          .append("IsCached: " + hasCachedReplica(fbFileBlock))
+          .append("ReplicaHosts: " + Joiner.on(", ").join(replicaHosts))
+          .append("DiskIds: " + Joiner.on(", ").join(diskIds))
+          .append("Caching: " + Joiner.on(", ").join(isBlockCached))
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index b6df167..e387d37 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -47,18 +47,18 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.NumericLiteral;
 import org.apache.impala.analysis.PartitionKeyValue;
-import org.apache.impala.catalog.HdfsPartition.BlockReplica;
 import org.apache.impala.catalog.HdfsPartition.FileBlock;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
+import org.apache.impala.common.Reference;
+import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsPartition;
 import org.apache.impala.thrift.THdfsTable;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -83,7 +83,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -119,14 +118,6 @@ public class HdfsTable extends Table {
   public static final String TBL_PROP_PARQUET_MR_WRITE_ZONE =
       "parquet.mr.int96.write.zone";
 
-  // An invalid network address, which will always be treated as remote.
-  private final static TNetworkAddress REMOTE_NETWORK_ADDRESS =
-      new TNetworkAddress("remote*addr", 0);
-
-  // Minimum block size in bytes allowed for synthetic file blocks (other than the last
-  // block, which may be shorter).
-  private final static long MIN_SYNTHETIC_BLOCK_SIZE = 1024 * 1024;
-
   // string to indicate NULL. set in load() from table properties
   private String nullColumnValue_;
 
@@ -144,14 +135,14 @@ public class HdfsTable extends Table {
   private boolean isMarkedCached_ = false;
 
   // Array of sorted maps storing the association between partition values and
-  // partition ids. There is one sorted map per partition key.
-  // TODO: We should not populate this for HdfsTable objects stored in the catalog
-  // server.
+  // partition ids. There is one sorted map per partition key. It is only populated if
+  // this table object is stored in ImpaladCatalog.
   private ArrayList<TreeMap<LiteralExpr, HashSet<Long>>> partitionValuesMap_ =
       Lists.newArrayList();
 
   // Array of partition id sets that correspond to partitions with null values
-  // in the partition keys; one set per partition key.
+  // in the partition keys; one set per partition key. It is not populated if the table is
+  // stored in the catalog server.
   private ArrayList<HashSet<Long>> nullPartitionIds_ = Lists.newArrayList();
 
   // Map of partition ids to HdfsPartitions.
@@ -293,7 +284,7 @@ public class HdfsTable extends Table {
         return;
       }
 
-      int unknownDiskIdCount = 0;
+      Reference<Long> numUnknownDiskIds = new Reference<Long>(Long.valueOf(0));
       RemoteIterator<LocatedFileStatus> fileStatusIter = fs.listFiles(dirPath, true);
       while (fileStatusIter.hasNext()) {
         LocatedFileStatus fileStatus = fileStatusIter.next();
@@ -311,15 +302,9 @@ public class HdfsTable extends Table {
           }
           continue;
         }
-        String fileName = fileStatus.getPath().getName();
-        FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
-            fileStatus.getModificationTime());
-        BlockLocation[] locations = fileStatus.getBlockLocations();
-        unknownDiskIdCount += setFdBlockMetadata(fd, locations);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " +
-              fileName);
-        }
+
+        FileDescriptor fd = FileDescriptor.create(fileStatus,
+            fileStatus.getBlockLocations(), fs, hostIndex_, numUnknownDiskIds);
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
           partition.getFileDescriptors().add(fd);
@@ -327,6 +312,8 @@ public class HdfsTable extends Table {
           totalHdfsBytes_ += fd.getFileLength();
         }
       }
+
+      long unknownDiskIdCount = numUnknownDiskIds.getRef();
       if (unknownDiskIdCount > 0) {
         if (LOG.isWarnEnabled()) {
           LOG.warn("Unknown disk id count for filesystem " + fs + ":" +
@@ -340,69 +327,6 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Sets the block metadata for FileDescriptor 'fd' using block location metadata
-   * from 'locations'.
-   */
-  private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations)
-      throws IOException {
-    int unknownFdDiskIds = 0;
-    for (BlockLocation loc: locations) {
-      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
-      // Enumerate all replicas of the block, adding any unknown hosts
-      // to hostIndex_. We pick the network address from getNames() and
-      // map it to the corresponding hostname from getHosts().
-      List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
-          loc.getNames().length);
-      for (int i = 0; i < loc.getNames().length; ++i) {
-        TNetworkAddress networkAddress =
-            BlockReplica.parseLocation(loc.getNames()[i]);
-        replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
-            cachedHosts.contains(loc.getHosts()[i])));
-      }
-      FileBlock currentBlock =
-          new FileBlock(loc.getOffset(), loc.getLength(), replicas);
-      THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
-      fd.addThriftFileBlock(tHdfsFileBlock);
-      unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock);
-    }
-    return unknownFdDiskIds;
-  }
-
-  /**
-   * Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
-   * HDFS API for BlockLocation returns a storageID UUID string for each disk
-   * hosting the block, which is then mapped to a 0-based integer id called disk ID.
-   * Returns the number of unknown disk IDs encountered in this process.
-   */
-  private int loadDiskIds(BlockLocation location, THdfsFileBlock fileBlock) {
-    int unknownDiskIdCount = 0;
-    String[] storageIds = location.getStorageIds();
-    String[] hosts;
-    try {
-      hosts = location.getHosts();
-    } catch (IOException e) {
-      LOG.error("Couldn't get hosts for block: " + location.toString(), e);
-      return unknownDiskIdCount;
-    }
-    if (storageIds.length != hosts.length) {
-      LOG.error("Number of storage IDs and number of hosts for block: " + location
-          .toString() + " mismatch. Skipping disk ID loading for this block.");
-      return unknownDiskIdCount;
-    }
-    int[] diskIDs = new int[storageIds.length];
-    for (int i = 0; i < storageIds.length; ++i) {
-      if (Strings.isNullOrEmpty(storageIds[i])) {
-        diskIDs[i] = -1;
-        ++unknownDiskIdCount;
-      } else {
-        diskIDs[i] = DiskIdMapper.INSTANCE.getDiskId(hosts[i], storageIds[i]);
-      }
-    }
-    FileBlock.setDiskIds(diskIDs, fileBlock);
-    return unknownDiskIdCount;
-  }
-
-  /**
    * Synthesize the block metadata for a given HdfsPartition object. Should only
    * be called for FileSystems that do not support storage IDs.
    */
@@ -439,14 +363,12 @@ public class HdfsTable extends Table {
         }
         continue;
       }
-      String fileName = fileStatus.getPath().getName();
-      FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
-          fileStatus.getModificationTime());
+
       Preconditions.checkState(partitions.size() > 0);
       // For the purpose of synthesizing block metadata, we assume that all partitions
       // with the same location have the same file format.
-      HdfsFileFormat fileFormat = partitions.get(0).getFileFormat();
-      synthesizeFdBlockMetadata(fs, fd, fileFormat);
+      FileDescriptor fd = FileDescriptor.createWithSynthesizedBlockMd(fileStatus,
+          partitions.get(0).getFileFormat(), hostIndex_);
       // Update the partitions' metadata that this file belongs to.
       for (HdfsPartition partition: partitions) {
         partition.getFileDescriptors().add(fd);
@@ -456,31 +378,6 @@ public class HdfsTable extends Table {
     }
   }
 
-  /**
-   * Helper method to synthesize block metadata for file descriptor fd.
-   */
-  private void synthesizeFdBlockMetadata(FileSystem fs, FileDescriptor fd,
-      HdfsFileFormat fileFormat) {
-    long start = 0;
-    long remaining = fd.getFileLength();
-    // Workaround HADOOP-11584 by using the filesystem default block size rather than
-    // the block size from the FileStatus.
-    // TODO: after HADOOP-11584 is resolved, get the block size from the FileStatus.
-    long blockSize = fs.getDefaultBlockSize();
-    if (blockSize < MIN_SYNTHETIC_BLOCK_SIZE) blockSize = MIN_SYNTHETIC_BLOCK_SIZE;
-    if (!fileFormat.isSplittable(HdfsCompression.fromFileName(fd.getFileName()))) {
-      blockSize = remaining;
-    }
-    while (remaining > 0) {
-      long len = Math.min(remaining, blockSize);
-      List<BlockReplica> replicas = Lists.newArrayList(
-          new BlockReplica(hostIndex_.getIndex(REMOTE_NETWORK_ADDRESS), false));
-      fd.addFileBlock(new FileBlock(start, len, replicas));
-      remaining -= len;
-      start += len;
-    }
-  }
-
   @Override
   public TCatalogObjectType getCatalogObjectType() {
     return TCatalogObjectType.TABLE;
@@ -662,12 +559,14 @@ public class HdfsTable extends Table {
     nameToPartitionMap_.clear();
     partitionValuesMap_.clear();
     nullPartitionIds_.clear();
-    // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
-    for (int i = 0; i < numClusteringCols_; ++i) {
-      getColumns().get(i).getStats().setNumNulls(0);
-      getColumns().get(i).getStats().setNumDistinctValues(0);
-      partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
-      nullPartitionIds_.add(Sets.<Long>newHashSet());
+    if (isStoredInImpaladCatalogCache()) {
+      // Initialize partitionValuesMap_ and nullPartitionIds_. Also reset column stats.
+      for (int i = 0; i < numClusteringCols_; ++i) {
+        getColumns().get(i).getStats().setNumNulls(0);
+        getColumns().get(i).getStats().setNumDistinctValues(0);
+        partitionValuesMap_.add(Maps.<LiteralExpr, HashSet<Long>>newTreeMap());
+        nullPartitionIds_.add(Sets.<Long>newHashSet());
+      }
     }
     numHdfsFiles_ = 0;
     totalHdfsBytes_ = 0;
@@ -821,10 +720,10 @@ public class HdfsTable extends Table {
         if (fd == null || partition.isMarkedCached() ||
             fd.getFileLength() != fileStatus.getLen() ||
             fd.getModificationTime() != fileStatus.getModificationTime()) {
-          fd = new FileDescriptor(fileName, fileStatus.getLen(),
-              fileStatus.getModificationTime());
-          setFdBlockMetadata(fd,
-              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()));
+          BlockLocation[] locations =
+              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+          fd = FileDescriptor.create(fileStatus, locations, fs, hostIndex_,
+              new Reference<Long>(Long.valueOf(0)));
         }
         newFileDescs.add(fd);
         newPartSizeBytes += fileStatus.getLen();
@@ -996,6 +895,8 @@ public class HdfsTable extends Table {
   private void updatePartitionMdAndColStats(HdfsPartition partition) {
     if (partition.getPartitionValues().size() != numClusteringCols_) return;
     partitionIds_.add(partition.getId());
+    nameToPartitionMap_.put(partition.getPartitionName(), partition);
+    if (!isStoredInImpaladCatalogCache()) return;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -1016,7 +917,6 @@ public class HdfsTable extends Table {
       }
       partitionIds.add(partition.getId());
     }
-    nameToPartitionMap_.put(partition.getPartitionName(), partition);
   }
 
   /**
@@ -1046,6 +946,7 @@ public class HdfsTable extends Table {
     partitionIds_.remove(partitionId);
     partitionMap_.remove(partitionId);
     nameToPartitionMap_.remove(partition.getPartitionName());
+    if (!isStoredInImpaladCatalogCache()) return partition;
     for (int i = 0; i < partition.getPartitionValues().size(); ++i) {
       ColumnStats stats = getColumns().get(i).getStats();
       LiteralExpr literal = partition.getPartitionValues().get(i);
@@ -1341,7 +1242,6 @@ public class HdfsTable extends Table {
     String key = TBL_PROP_SKIP_HEADER_LINE_COUNT;
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     if (msTbl == null) return false;
-    String inputFormat = msTbl.getSd().getInputFormat();
     return msTbl.getParameters().containsKey(key);
   }
 
@@ -1555,8 +1455,6 @@ public class HdfsTable extends Table {
     }
     org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
     Preconditions.checkNotNull(msTbl);
-    HdfsStorageDescriptor fileFormatDescriptor =
-        HdfsStorageDescriptor.fromStorageDescriptor(this.name_, msTbl.getSd());
     for (HdfsPartition partition: partitions) {
       org.apache.hadoop.hive.metastore.api.Partition msPart =
           partition.toHmsPartition();
@@ -1585,8 +1483,6 @@ public class HdfsTable extends Table {
       HdfsPartition partition) throws Exception {
     Preconditions.checkNotNull(storageDescriptor);
     Preconditions.checkNotNull(partition);
-    org.apache.hadoop.hive.metastore.api.Partition msPart =
-        partition.toHmsPartition();
     Path partDirPath = new Path(storageDescriptor.getLocation());
     FileSystem fs = partDirPath.getFileSystem(CONF);
     if (!fs.exists(partDirPath)) return;
@@ -1622,7 +1518,6 @@ public class HdfsTable extends Table {
     Preconditions.checkState(hdfsTable.getNetwork_addresses() instanceof ArrayList<?>);
     hostIndex_.populate((ArrayList<TNetworkAddress>)hdfsTable.getNetwork_addresses());
     resetPartitions();
-
     try {
       for (Map.Entry<Long, THdfsPartition> part: hdfsTable.getPartitions().entrySet()) {
         HdfsPartition hdfsPart =
@@ -1923,9 +1818,11 @@ public class HdfsTable extends Table {
         // Calculate the number the number of bytes that are cached.
         long cachedBytes = 0L;
         for (FileDescriptor fd: p.getFileDescriptors()) {
-          for (THdfsFileBlock fb: fd.getFileBlocks()) {
-            if (fb.getIs_replica_cached().contains(true)) {
-              cachedBytes += fb.getLength();
+          int numBlocks = fd.getNumFileBlocks();
+          for (int i = 0; i < numBlocks; ++i) {
+            FbFileBlock block = fd.getFbFileBlock(i);
+            if (FileBlock.hasCachedReplica(block)) {
+              cachedBytes += FileBlock.getLength(block);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index f70186e..881841c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -18,7 +18,6 @@
 package org.apache.impala.catalog;
 
 import java.util.ArrayList;
-import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -59,6 +59,7 @@ import com.google.common.collect.Maps;
 public abstract class Table implements CatalogObject {
   private static final Logger LOG = Logger.getLogger(Table.class);
 
+  // Catalog version assigned to this table
   private long catalogVersion_ = Catalog.INITIAL_CATALOG_VERSION;
   protected org.apache.hadoop.hive.metastore.api.Table msTable_;
 
@@ -88,6 +89,9 @@ public abstract class Table implements CatalogObject {
   // The lastDdlTime for this table; -1 if not set
   protected long lastDdlTime_;
 
+  // True if this object is stored in an Impalad catalog cache.
+  protected boolean storedInImpaladCatalogCache_ = false;
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -103,6 +107,13 @@ public abstract class Table implements CatalogObject {
       int tableId, Set<Long> referencedPartitions);
   public abstract TCatalogObjectType getCatalogObjectType();
 
+  // Returns true if this table reference comes from the impalad catalog cache or if it
+  // is loaded from the testing framework. Returns false if this table reference points
+  // to a table stored in the catalog server.
+  public boolean isStoredInImpaladCatalogCache() {
+    return storedInImpaladCatalogCache_ || RuntimeEnv.INSTANCE.isTestEnv();
+  }
+
   /**
    * Populate members of 'this' from metastore info. If 'reuseMetadata' is true, reuse
    * valid existing metadata.
@@ -276,6 +287,8 @@ public abstract class Table implements CatalogObject {
     // Default to READ_WRITE access if the field is not set.
     accessLevel_ = thriftTable.isSetAccess_level() ? thriftTable.getAccess_level() :
         TAccessLevel.READ_WRITE;
+
+    storedInImpaladCatalogCache_ = true;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index bd260e0..40d750b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -54,10 +54,10 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.NotImplementedException;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.fb.FbFileBlock;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExplainLevel;
 import org.apache.impala.thrift.TExpr;
-import org.apache.impala.thrift.THdfsFileBlock;
 import org.apache.impala.thrift.THdfsFileSplit;
 import org.apache.impala.thrift.THdfsScanNode;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -540,47 +540,47 @@ public class HdfsScanNode extends ScanNode {
       boolean partitionMissingDiskIds = false;
       for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
         boolean fileDescMissingDiskIds = false;
-        for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
-          HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
-          List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
-          if (replicaHostIdxs.size() == 0) {
+        for (int j = 0; j < fileDesc.getNumFileBlocks(); ++j) {
+          FbFileBlock block = fileDesc.getFbFileBlock(j);
+          int replicaHostCount = FileBlock.getNumReplicaHosts(block);
+          if (replicaHostCount == 0) {
             // we didn't get locations for this block; for now, just ignore the block
             // TODO: do something meaningful with that
             continue;
           }
           // Collect the network address and volume ID of all replicas of this block.
           List<TScanRangeLocation> locations = Lists.newArrayList();
-          for (int i = 0; i < replicaHostIdxs.size(); ++i) {
+          for (int i = 0; i < replicaHostCount; ++i) {
             TScanRangeLocation location = new TScanRangeLocation();
             // Translate from the host index (local to the HdfsTable) to network address.
-            Integer tableHostIdx = replicaHostIdxs.get(i);
+            int replicaHostIdx = FileBlock.getReplicaHostIdx(block, i);
             TNetworkAddress networkAddress =
-                partition.getTable().getHostIndex().getEntry(tableHostIdx);
+                partition.getTable().getHostIndex().getEntry(replicaHostIdx);
             Preconditions.checkNotNull(networkAddress);
             // Translate from network address to the global (to this request) host index.
             Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
             location.setHost_idx(globalHostIdx);
-            if (checkMissingDiskIds && block.getDiskId(i) == -1) {
+            if (checkMissingDiskIds && FileBlock.getDiskId(block, i) == -1) {
               ++numScanRangesNoDiskIds_;
               partitionMissingDiskIds = true;
               fileDescMissingDiskIds = true;
             }
-            location.setVolume_id(block.getDiskId(i));
-            location.setIs_cached(block.isCached(i));
+            location.setVolume_id(FileBlock.getDiskId(block, i));
+            location.setIs_cached(FileBlock.isReplicaCached(block, i));
             locations.add(location);
           }
           // create scan ranges, taking into account maxScanRangeLength
-          long currentOffset = block.getOffset();
-          long remainingLength = block.getLength();
+          long currentOffset = FileBlock.getOffset(block);
+          long remainingLength = FileBlock.getLength(block);
           while (remainingLength > 0) {
             long currentLength = remainingLength;
             if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) {
               currentLength = maxScanRangeLength;
             }
             TScanRange scanRange = new TScanRange();
-            scanRange.setHdfs_file_split(new THdfsFileSplit(
-                fileDesc.getFileName(), currentOffset, currentLength, partition.getId(),
-                fileDesc.getFileLength(), fileDesc.getFileCompression(),
+            scanRange.setHdfs_file_split(new THdfsFileSplit(fileDesc.getFileName(),
+                currentOffset, currentLength, partition.getId(), fileDesc.getFileLength(),
+                fileDesc.getFileCompression().toThrift(),
                 fileDesc.getModificationTime()));
             TScanRangeLocationList scanRangeLocations = new TScanRangeLocationList();
             scanRangeLocations.scan_range = scanRange;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 7d80ce0..67005be 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -99,7 +99,7 @@ public class CatalogObjectToFromThriftTest {
         } else {
           Assert.assertEquals(hdfsPart.getFileDescriptors().size(), 1);
           Assert.assertTrue(
-              hdfsPart.getFileDescriptors().get(0).getFileBlocks().size() > 0);
+              hdfsPart.getFileDescriptors().get(0).getNumFileBlocks() > 0);
 
           // Verify the partition access level is getting set properly. The alltypes_seq
           // table has two partitions that are read_only.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6b5f82e/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
index 297666f..6202991 100644
--- a/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
+++ b/fe/src/test/java/org/apache/impala/common/FrontendTestBase.java
@@ -56,6 +56,8 @@ import org.apache.impala.thrift.TQueryCtx;
 import org.apache.impala.thrift.TQueryOptions;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -75,6 +77,16 @@ public class FrontendTestBase {
   protected final List<Db> testDbs_ = Lists.newArrayList();
   protected final List<Table> testTables_ = Lists.newArrayList();
 
+  @BeforeClass
+  public static void setUp() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(true);
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    RuntimeEnv.INSTANCE.setTestEnv(false);
+  }
+
   protected Analyzer createAnalyzer(String defaultDb) {
     TQueryCtx queryCtx =
         TestUtils.createQueryContext(defaultDb, System.getProperty("user.name"));


[3/4] incubator-impala git commit: IMPALA-5158: Part 1: include untracked memory in MemTracker dumps

Posted by ta...@apache.org.
IMPALA-5158: Part 1: include untracked memory in MemTracker dumps

The MemTracker dumps are easier to understand if all children of the
process tracker actually add up to the process consumption. There is
always a discrepancy, which corresponds (mostly) to untracked memory.
This commit adds a new line item in the MemTracker dump which is the
process consumption value minus the sum of all its children.

Here is an example of the new output:

  Process: Limit=8.35 GB Total=123.74 MB Peak=219.93 MB
    Free Disk IO Buffers: Total=1.37 MB Peak=1.37 MB
    RequestPool=default-pool: Total=609.09 KB Peak=10.14 MB
      Query(fd40f6b7542f7e7c:4aef272300000000): Total=609.09 KB Peak=1.19 MB
	Fragment fd40f6b7542f7e7c:4aef272300000000: Total=609.09 KB Peak=1.19 MB
	  AGGREGATION_NODE (id=3): Total=24.00 KB Peak=36.00 KB
	    Exprs: Total=4.00 KB Peak=4.00 KB
	  NESTED_LOOP_JOIN_NODE (id=2): Total=412.90 KB Peak=412.90 KB
	    Nested Loop Join Builder: Total=392.00 KB Peak=392.00 KB
	  HDFS_SCAN_NODE (id=1): Total=160.00 KB Peak=768.00 KB
	    Exprs: Total=4.00 KB Peak=4.00 KB
	  HDFS_SCAN_NODE (id=0): Total=0 Peak=344.00 KB
	  PLAN_ROOT_SINK: Total=0 Peak=0
	  CodeGen: Total=4.19 KB Peak=267.50 KB
	Block Manager: Limit=6.68 GB Total=0 Peak=0
    RequestPool=fe-eval-exprs: Total=0 Peak=4.00 KB
    Untracked Memory: Total=121.78 MB

Change-Id: I77e5b2ef5f85f82ee0226e4dad638233fec9428f
Reviewed-on: http://gerrit.cloudera.org:8080/6820
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eb7ee677
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eb7ee677
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eb7ee677

Branch: refs/heads/master
Commit: eb7ee67737288e654b4c291b2d726e2f8bc9f107
Parents: ff5b046
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon May 8 15:16:49 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed May 10 00:05:48 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/mem-tracker.cc | 64 ++++++++++++++++++++++++--------------
 be/src/runtime/mem-tracker.h  |  8 +++--
 2 files changed, 47 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb7ee677/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 4ad9b26..d2dceb3 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -131,15 +131,14 @@ int64_t MemTracker::GetPoolMemReserved() const {
 
   int64_t mem_reserved = 0L;
   lock_guard<SpinLock> l(child_trackers_lock_);
-  for (list<MemTracker*>::const_iterator it = child_trackers_.begin();
-       it != child_trackers_.end(); ++it) {
-    int64_t child_limit = (*it)->limit();
+  for (MemTracker* child : child_trackers_) {
+    int64_t child_limit = child->limit();
     if (child_limit > 0) {
       // Make sure we don't overflow if the query limits are set to ridiculous values.
       mem_reserved += std::min(child_limit, MemInfo::physical_mem());
     } else {
-      DCHECK_EQ(child_limit, -1) << (*it)->LogUsage("");
-      mem_reserved += (*it)->consumption();
+      DCHECK_EQ(child_limit, -1) << child->LogUsage("");
+      mem_reserved += child->consumption();
     }
   }
   return mem_reserved;
@@ -237,17 +236,18 @@ void MemTracker::RefreshConsumptionFromMetric() {
 //   TrackerName: Limit=5.00 MB BufferPoolUsed/Reservation=0/5.00 MB OtherMemory=1.04 MB
 //                Total=6.04 MB Peak=6.45 MB
 //
-string MemTracker::LogUsage(const string& prefix) const {
-  if (!log_usage_if_zero_ && consumption() == 0) return "";
+string MemTracker::LogUsage(const string& prefix, int64_t* logged_consumption) const {
+  int64_t curr_consumption = consumption();
+  int64_t peak_consumption = consumption_->value();
+  if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
+
+  if (!log_usage_if_zero_ && curr_consumption == 0) return "";
 
   stringstream ss;
   ss << prefix << label_ << ":";
   if (CheckLimitExceeded()) ss << " memory limit exceeded.";
   if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
 
-  int64_t total = consumption();
-  int64_t peak = consumption_->value();
-
   ReservationTrackerCounters* reservation_counters = reservation_counters_.Load();
   if (reservation_counters != nullptr) {
     int64_t reservation = reservation_counters->peak_reservation->current_value();
@@ -260,26 +260,44 @@ string MemTracker::LogUsage(const string& prefix) const {
     if (reservation_limit != numeric_limits<int64_t>::max()) {
       ss << " BufferPoolLimit=" << PrettyPrinter::Print(reservation_limit, TUnit::BYTES);
     }
-    ss << " OtherMemory=" << PrettyPrinter::Print(total - reservation, TUnit::BYTES);
+    ss << " OtherMemory="
+       << PrettyPrinter::Print(curr_consumption - reservation, TUnit::BYTES);
+  }
+  ss << " Total=" << PrettyPrinter::Print(curr_consumption, TUnit::BYTES)
+     << " Peak=" << PrettyPrinter::Print(peak_consumption, TUnit::BYTES);
+
+  string new_prefix = Substitute("  $0", prefix);
+  int64_t child_consumption;
+  string child_trackers_usage;
+  {
+    lock_guard<SpinLock> l(child_trackers_lock_);
+    child_trackers_usage = LogUsage(new_prefix, child_trackers_, &child_consumption);
   }
-  ss << " Total=" << PrettyPrinter::Print(total, TUnit::BYTES)
-     << " Peak=" << PrettyPrinter::Print(peak, TUnit::BYTES);
-
-  stringstream prefix_ss;
-  prefix_ss << prefix << "  ";
-  string new_prefix = prefix_ss.str();
-  lock_guard<SpinLock> l(child_trackers_lock_);
-  string child_trackers_usage = LogUsage(new_prefix, child_trackers_);
   if (!child_trackers_usage.empty()) ss << "\n" << child_trackers_usage;
+
+  if (consumption_metric_ != nullptr) {
+    // Log the difference between the metric value and children as "untracked" memory so
+    // that the values always add up. This value is not always completely accurate because
+    // we did not necessarily get a consistent snapshot of the consumption values for all
+    // children at a single moment in time, but is good enough for our purposes.
+    int64_t untracked_bytes = curr_consumption - child_consumption;
+    ss << "\n"
+       << new_prefix << "Untracked Memory: Total="
+       << PrettyPrinter::Print(untracked_bytes, TUnit::BYTES);
+  }
+
   return ss.str();
 }
 
-string MemTracker::LogUsage(const string& prefix, const list<MemTracker*>& trackers) {
+string MemTracker::LogUsage(const string& prefix, const list<MemTracker*>& trackers,
+    int64_t* logged_consumption) {
+  *logged_consumption = 0;
   vector<string> usage_strings;
-  for (list<MemTracker*>::const_iterator it = trackers.begin();
-      it != trackers.end(); ++it) {
-    string usage_string = (*it)->LogUsage(prefix);
+  for (MemTracker* tracker : trackers) {
+    int64_t tracker_consumption;
+    string usage_string = tracker->LogUsage(prefix, &tracker_consumption);
     if (!usage_string.empty()) usage_strings.push_back(usage_string);
+    *logged_consumption += tracker_consumption;
   }
   return join(usage_strings, "\n");
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eb7ee677/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index e3282c7..f962fa0 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -311,9 +311,11 @@ class MemTracker {
   void RegisterMetrics(MetricGroup* metrics, const std::string& prefix);
 
   /// Logs the usage of this tracker and all of its children (recursively).
+  /// If 'logged_consumption' is non-NULL, sets the consumption value logged.
   /// TODO: once all memory is accounted in ReservationTracker hierarchy, move
   /// reporting there.
-  std::string LogUsage(const std::string& prefix = "") const;
+  std::string LogUsage(
+      const std::string& prefix = "", int64_t* logged_consumption = nullptr) const;
 
   /// Log the memory usage when memory limit is exceeded and return a status object with
   /// details of the allocation which caused the limit to be exceeded.
@@ -347,8 +349,10 @@ class MemTracker {
   /// Adds tracker to child_trackers_
   void AddChildTracker(MemTracker* tracker);
 
+  /// Log consumption of all the trackers provided. Returns the sum of consumption in
+  /// 'logged_consumption'.
   static std::string LogUsage(const std::string& prefix,
-      const std::list<MemTracker*>& trackers);
+      const std::list<MemTracker*>& trackers, int64_t* logged_consumption);
 
   /// Size, in bytes, that is considered a large value for Release() (or Consume() with
   /// a negative value). If tcmalloc is used, this can trigger it to GC.