You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2018/01/19 21:41:44 UTC

[1/5] impala git commit: IMPALA-6268: KerberosOnAndOff/RpcMgrKerberizedTest.MultipleServices failing

Repository: impala
Updated Branches:
  refs/heads/master e714f2b33 -> 4afabd4e3


IMPALA-6268: KerberosOnAndOff/RpcMgrKerberizedTest.MultipleServices failing

On systems that have Kerberos 1.11 or earlier, service principals with
IP addresses are not supported due to a bug:

http://krbdev.mit.edu/rt/Ticket/Display.html?id=7603

Since our BE tests use such principals, they fail on older platforms with the
above mentioned kerberos versions.

Kudu fixed this by adding a workaround which overrides krb5_realm_override.

https://github.com/cloudera/kudu/commit/ba2ae3de4a7c43ff2f5873e822410e066ea99667

However, when we moved Kudu's security library into Impala, we did not
add the appropriate build flags that allow it to be used. This patch fixes
that.

Testing: Verified that the failing test runs successfully on CentOs 6.4
with Kerberos 1.10.3

Change-Id: I60e291e8aa1b59b645b856d33c658471f314c221
Reviewed-on: http://gerrit.cloudera.org:8080/9006
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d8ae8801
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d8ae8801
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d8ae8801

Branch: refs/heads/master
Commit: d8ae8801ae668f6ba4771c5794b80f7c9262cd65
Parents: e714f2b
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jan 9 14:58:38 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 01:21:45 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt             | 5 +++++
 be/src/rpc/CMakeLists.txt  | 1 +
 be/src/rpc/rpc-mgr-test.cc | 4 +++-
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d8ae8801/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 37a6324..612e00c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -307,6 +307,11 @@ IMPALA_ADD_THIRDPARTY_LIB(krb5 ${KERBEROS_INCLUDE_DIR} "" ${KERBEROS_LIBRARY})
 # testing.
 find_package(KerberosPrograms REQUIRED)
 
+# Tests that run any security related tests need to link this in to override the
+# krb5_realm_override() implementation in krb5.
+# See be/src/kudu/security/krb5_realm_override.cc for more information.
+set(KRB5_REALM_OVERRIDE -Wl,--undefined=krb5_realm_override_loaded krb5_realm_override)
+
 ###################################################################
 
 # System dependencies

http://git-wip-us.apache.org/repos/asf/impala/blob/d8ae8801/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 4234e2b..7beb80d 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -50,6 +50,7 @@ ADD_BE_TEST(rpc-mgr-test)
 add_dependencies(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test rpc_test_proto)
 target_link_libraries(rpc-mgr-test security-test-for-impala)
+target_link_libraries(rpc-mgr-test ${KRB5_REALM_OVERRIDE})
 
 add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS})
 add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc)

http://git-wip-us.apache.org/repos/asf/impala/blob/d8ae8801/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 7e3cb25..441619b 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -195,7 +195,9 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
 // Reenable after fixing.
 INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
                         RpcMgrKerberizedTest,
-                        ::testing::Values(KERBEROS_OFF));
+                        ::testing::Values(KERBEROS_OFF,
+                                          USE_KUDU_KERBEROS,
+                                          USE_IMPALA_KERBEROS));
 
 TEST_P(RpcMgrKerberizedTest, MultipleServices) {
   // Test that a service can be started, and will respond to requests.


[4/5] impala git commit: IMPALA-4993: extend dictionary filtering to collections

Posted by jr...@apache.org.
IMPALA-4993: extend dictionary filtering to collections

Currently, top-level scalar columns in parquet files can
be used at runtime to prune row-groups by evaluating certain
conjuncts over the column's dictionary (if available).

This change extends such pruning to scalar values that are
stored in collection type columns. Currently, dictionary
pruning works by finding eligible conjuncts for top-level
slots. Since only top-level slots are supported, the slots
are implicitly part of the scan node's tuple descriptor.
With this change, we track eligible conjuncts by slot as well
as the tuple that contains the slot (either top-level or
nested collection). Since collection conjuncts are already
managed by a map that associates tuple descriptors to a list
of their conjuncts, this extension follows the existing
representation.

The frontend builds the mapping of SlotId to conjuncts that
are dictionary filterable. This mapping now includes SlotId's
that reference nested tuples. The backend is adjusted to
use the same representation. In addition, collection
readers are decomposed into scalar filterable columns and
other, non-dictionary filterable readers. When filtering
a row group using a conjunct associated to a (possibly)
nested collection type, an additional tuple buffer is
allocated per tuple descriptor.

Testing:
- e2e test extended to illustrate row-groups that are pruned
  by nested collection dictionary filters.

Change-Id: If3a2abcfc3d0f7d18756816659fed77ce12668dd
Reviewed-on: http://gerrit.cloudera.org:8080/8775
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/db98dc65
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/db98dc65
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/db98dc65

Branch: refs/heads/master
Commit: db98dc6504368f24eb20e12959f6d779be31c9b6
Parents: 579e332
Author: Vuk Ercegovac <ve...@cloudera.com>
Authored: Mon Nov 27 18:27:47 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 20:37:25 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             | 293 ++++++++++---------
 be/src/exec/hdfs-parquet-scanner.h              |  63 ++--
 be/src/exec/hdfs-scan-node-base.h               |   3 +-
 be/src/exec/hdfs-scanner.cc                     |  14 +-
 be/src/exec/hdfs-scanner.h                      |   5 +-
 be/src/exec/parquet-column-readers.cc           |   4 +-
 be/src/runtime/collection-value-builder.h       |   2 +-
 be/src/runtime/scoped-buffer.h                  |   1 +
 common/thrift/PlanNodes.thrift                  |   3 +-
 .../apache/impala/analysis/SlotDescriptor.java  |   4 +
 .../apache/impala/analysis/TupleDescriptor.java |   2 +
 .../org/apache/impala/planner/HdfsScanNode.java | 215 +++++++++-----
 testdata/CustomerMultiBlock/README              |  12 +
 .../customer_multiblock.parquet                 | Bin 0 -> 494519 bytes
 .../functional/functional_schema_template.sql   |  19 ++
 .../datasets/functional/schema_constraints.csv  |   1 +
 .../queries/PlannerTest/constant-folding.test   |   2 +
 .../queries/PlannerTest/mt-dop-validation.test  |   6 +
 .../queries/PlannerTest/parquet-filtering.test  |  66 +++++
 .../queries/QueryTest/parquet-filtering.test    | 237 ++++++++++++++-
 20 files changed, 706 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f0f280d..3a17a3b 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -68,7 +68,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
     "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
-    const std::vector<HdfsFileDesc*>& files) {
+    const vector<HdfsFileDesc*>& files) {
   vector<ScanRange*> footer_ranges;
   for (int i = 0; i < files.size(); ++i) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
@@ -101,7 +101,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
         // the actual split (in InitColumns()). The original split is stored in the
         // metadata associated with the footer range.
         ScanRange* footer_range;
-        if (footer_split != NULL) {
+        if (footer_split != nullptr) {
           footer_range = scan_node->AllocateScanRange(files[i]->fs,
               files[i]->filename.c_str(), footer_size, footer_start,
               split_metadata->partition_id, footer_split->disk_id(),
@@ -128,12 +128,12 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
 }
 
 ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
-  DCHECK(file != NULL);
+  DCHECK(file != nullptr);
   for (int i = 0; i < file->splits.size(); ++i) {
     ScanRange* split = file->splits[i];
     if (split->offset() + split->len() == file->file_length) return split;
   }
-  return NULL;
+  return nullptr;
 }
 
 namespace impala {
@@ -143,22 +143,21 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     row_group_idx_(-1),
     row_group_rows_read_(0),
     advance_row_group_(true),
-    min_max_tuple_buffer_(scan_node->mem_tracker()),
+    min_max_tuple_(nullptr),
     row_batches_produced_(0),
     scratch_batch_(new ScratchTupleBatch(
         *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
-    metadata_range_(NULL),
+    metadata_range_(nullptr),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
-    dict_filter_tuple_backing_(scan_node->mem_tracker()),
     assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
-    process_footer_timer_stats_(NULL),
-    num_cols_counter_(NULL),
-    num_stats_filtered_row_groups_counter_(NULL),
-    num_row_groups_counter_(NULL),
-    num_scanners_with_no_reads_counter_(NULL),
-    num_dict_filtered_row_groups_counter_(NULL),
+    process_footer_timer_stats_(nullptr),
+    num_cols_counter_(nullptr),
+    num_stats_filtered_row_groups_counter_(nullptr),
+    num_row_groups_counter_(nullptr),
+    num_scanners_with_no_reads_counter_(nullptr),
+    num_dict_filtered_row_groups_counter_(nullptr),
     coll_items_read_counter_(0),
-    codegend_process_scratch_batch_fn_(NULL) {
+    codegend_process_scratch_batch_fn_(nullptr) {
   assemble_rows_timer_.Stop();
 }
 
@@ -181,23 +180,25 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
-  if (codegend_process_scratch_batch_fn_ == NULL) {
+  if (codegend_process_scratch_batch_fn_ == nullptr) {
     scan_node_->IncNumScannersCodegenDisabled();
   } else {
     scan_node_->IncNumScannersCodegenEnabled();
   }
 
-  level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker()));
+  perm_pool_.reset(new MemPool(scan_node_->mem_tracker()));
 
   // Allocate tuple buffer to evaluate conjuncts on parquet::Statistics.
   const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc();
   if (min_max_tuple_desc != nullptr) {
     int64_t tuple_size = min_max_tuple_desc->byte_size();
-    if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) {
+    uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+    if (buffer == nullptr) {
       string details = Substitute("Could not allocate buffer of $0 bytes for Parquet "
           "statistics tuple for file '$1'.", tuple_size, filename());
       return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
     }
+    min_max_tuple_ = reinterpret_cast<Tuple*>(buffer);
   }
 
   // Clone the min/max statistics conjuncts.
@@ -207,7 +208,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
-    DCHECK(ctx->filter != NULL);
+    DCHECK(ctx->filter != nullptr);
     filter_ctxs_.push_back(ctx);
   }
   filter_stats_.resize(filter_ctxs_.size());
@@ -248,7 +249,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // The scanner-wide stream was used only to read the file footer.  Each column has added
   // its own stream.
-  stream_ = NULL;
+  stream_ = nullptr;
   return Status::OK();
 }
 
@@ -270,9 +271,9 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     // Open() fails or if the query is cancelled.
     scratch_batch_->ReleaseResources(nullptr);
   }
-  if (level_cache_pool_ != nullptr) {
-    level_cache_pool_->FreeAll();
-    level_cache_pool_.reset();
+  if (perm_pool_ != nullptr) {
+    perm_pool_->FreeAll();
+    perm_pool_.reset();
   }
 
   // Verify all resources (if any) have been transferred.
@@ -398,7 +399,7 @@ Status HdfsParquetScanner::ProcessSplit() {
       eos_ = true;
       break;
     }
-    unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(),
+    unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
         state_->batch_size(), scan_node_->mem_tracker());
     Status status = GetNextInternal(batch.get());
     // Always add batch to the queue because it may contain data referenced by previously
@@ -526,8 +527,8 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
   int64_t tuple_size = min_max_tuple_desc->byte_size();
 
-  Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer());
-  min_max_tuple->Init(tuple_size);
+  DCHECK(min_max_tuple_ != nullptr);
+  min_max_tuple_->Init(tuple_size);
 
   DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size());
   for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
@@ -569,7 +570,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     const parquet::ColumnChunk& col_chunk = row_group.columns[col_idx];
     const ColumnType& col_type = slot_desc->type();
     bool stats_read = false;
-    void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset());
+    void* slot = min_max_tuple_->GetSlot(slot_desc->tuple_offset());
     const string& fn_name = eval->root().function_name();
     if (fn_name == "lt" || fn_name == "le") {
       // We need to get min stats.
@@ -585,7 +586,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
 
     if (stats_read) {
       TupleRow row;
-      row.SetTuple(0, min_max_tuple);
+      row.SetTuple(0, min_max_tuple_);
       if (!ExecNode::EvalPredicate(eval, &row)) {
         *skip_row_group = true;
         break;
@@ -662,12 +663,14 @@ Status HdfsParquetScanner::NextRowGroup() {
       continue;
     }
 
+    InitCollectionColumns();
+
     // Prepare dictionary filtering columns for first read
     // This must be done before dictionary filtering, because this code initializes
     // the column offsets and streams needed to read the dictionaries.
     // TODO: Restructure the code so that the dictionary can be read without the rest
     // of the column.
-    RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
 
     // If there is a dictionary-encoded column where every value is eliminated
     // by a conjunct, the row group can be eliminated. This initializes dictionaries
@@ -687,7 +690,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     // At this point, the row group has passed any filtering criteria
     // Prepare non-dictionary filtering column readers for first read and
     // initialize their dictionaries.
-    RETURN_IF_ERROR(InitColumns(row_group_idx_, non_dict_filterable_readers_));
+    RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
     status = InitDictionaries(non_dict_filterable_readers_);
     if (!status.ok()) {
       // Either return an error or skip this row group if it is ok to ignore errors
@@ -735,13 +738,8 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   }
 }
 
-bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
-  // Nested types are not supported yet
-  if (col_reader->IsCollectionReader()) return false;
-
-  BaseScalarColumnReader* scalar_reader =
-    static_cast<BaseScalarColumnReader*>(col_reader);
-  const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+bool HdfsParquetScanner::IsDictFilterable(BaseScalarColumnReader* col_reader) {
+  const SlotDescriptor* slot_desc = col_reader->slot_desc();
   // Some queries do not need the column to be materialized, so slot_desc is NULL.
   // For example, a count(*) with no predicates only needs to count records
   // rather than materializing the values.
@@ -755,44 +753,60 @@ bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
   // dictionary values, so skip these datatypes for now.
   // TODO: The values should be converted during dictionary construction and stored
   // in converted form in the dictionary.
-  if (scalar_reader->NeedsConversion()) return false;
+  if (col_reader->NeedsConversion()) return false;
 
   // Certain datatypes (timestamps) need to validate the value, as certain bit
   // combinations are not valid. The dictionary values are not validated, so
   // skip these datatypes for now.
   // TODO: This should be pushed into dictionary construction.
-  if (scalar_reader->NeedsValidation()) return false;
+  if (col_reader->NeedsValidation()) return false;
 
   return true;
 }
 
-Status HdfsParquetScanner::InitDictFilterStructures() {
-  // Check dictionary filtering query option
-  bool dictionary_filtering_enabled =
-      state_->query_options().parquet_dictionary_filtering;
-
-  // Allocate space for dictionary filtering tuple
-  // Certain queries do not need any columns to be materialized (such as count(*))
-  // and have a tuple size of 0. Explicitly disable dictionary filtering in this case.
-  int tuple_size = scan_node_->tuple_desc()->byte_size();
-  if (tuple_size > 0) {
-    if (!dict_filter_tuple_backing_.TryAllocate(tuple_size)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED,
-          "InitDictFilterStructures", tuple_size, "Dictionary Filtering Tuple");
-      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
+void HdfsParquetScanner::PartitionReaders(
+    const vector<ParquetColumnReader*>& readers, bool can_eval_dict_filters) {
+  for (auto* reader : readers) {
+    if (reader->IsCollectionReader()) {
+      CollectionColumnReader* col_reader = static_cast<CollectionColumnReader*>(reader);
+      collection_readers_.push_back(col_reader);
+      PartitionReaders(*col_reader->children(), can_eval_dict_filters);
+    } else {
+      BaseScalarColumnReader* scalar_reader =
+          static_cast<BaseScalarColumnReader*>(reader);
+      if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
+        dict_filterable_readers_.push_back(scalar_reader);
+      } else {
+        non_dict_filterable_readers_.push_back(scalar_reader);
+      }
     }
-  } else {
-    dictionary_filtering_enabled = false;
   }
+}
 
-  // Divide the column readers into a list of column readers that are eligible for
-  // dictionary filtering and a list of column readers that are not. If dictionary
-  // filtering is disabled, all column readers go into the ineligible list.
-  for (ParquetColumnReader* col_reader : column_readers_) {
-    if (dictionary_filtering_enabled && IsDictFilterable(col_reader)) {
-      dict_filterable_readers_.push_back(col_reader);
-    } else {
-      non_dict_filterable_readers_.push_back(col_reader);
+Status HdfsParquetScanner::InitDictFilterStructures() {
+  bool can_eval_dict_filters =
+      state_->query_options().parquet_dictionary_filtering && !dict_filter_map_.empty();
+
+  // Separate column readers into scalar and collection readers.
+  PartitionReaders(column_readers_, can_eval_dict_filters);
+
+  // Allocate tuple buffers for all tuple descriptors that are associated with conjuncts
+  // that can be dictionary filtered.
+  for (auto* col_reader : dict_filterable_readers_) {
+    const SlotDescriptor* slot_desc = col_reader->slot_desc();
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
+    auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+    if (tuple_it != dict_filter_tuple_map_.end()) continue;
+    int tuple_size = tuple_desc->byte_size();
+    if (tuple_size > 0) {
+      uint8_t* buffer = perm_pool_->TryAllocate(tuple_size);
+      if (buffer == nullptr) {
+        string details = Substitute(
+            PARQUET_MEM_LIMIT_EXCEEDED, "InitDictFilterStructures", tuple_size,
+            "Dictionary Filtering Tuple");
+        return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
+      }
+      dict_filter_tuple_map_[tuple_desc] = reinterpret_cast<Tuple*>(buffer);
     }
   }
   return Status::OK();
@@ -857,6 +871,8 @@ bool HdfsParquetScanner::IsDictionaryEncoded(
 Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
     bool* row_group_eliminated) {
   *row_group_eliminated = false;
+  // Check if there's anything to do here.
+  if (dict_filterable_readers_.empty()) return Status::OK();
 
   // Legacy impala files (< 2.9) require special handling, because they do not encode
   // information about whether the column is 100% dictionary encoded.
@@ -865,14 +881,13 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
     is_legacy_impala = true;
   }
 
-  Tuple* dict_filter_tuple =
-      reinterpret_cast<Tuple*>(dict_filter_tuple_backing_.buffer());
-  dict_filter_tuple->Init(scan_node_->tuple_desc()->byte_size());
-  vector<ParquetColumnReader*> deferred_dict_init_list;
-  for (ParquetColumnReader* col_reader : dict_filterable_readers_) {
-    DCHECK(!col_reader->IsCollectionReader());
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
+  // Keeps track of column readers that need to be initialized. For example, if a
+  // column cannot be filtered, then defer its dictionary initialization once we know
+  // the row group cannot be filtered.
+  vector<BaseScalarColumnReader*> deferred_dict_init_list;
+  // Keeps track of the initialized tuple associated with a TupleDescriptor.
+  unordered_map<const TupleDescriptor*, Tuple*> tuple_map;
+  for (BaseScalarColumnReader* scalar_reader : dict_filterable_readers_) {
     const parquet::ColumnMetaData& col_metadata =
         row_group.columns[scalar_reader->col_idx()].meta_data;
 
@@ -898,10 +913,25 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
         dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
 
     const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+    DCHECK(slot_desc != nullptr);
+    const TupleDescriptor* tuple_desc = slot_desc->parent();
     auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
     DCHECK(dict_filter_it != dict_filter_map_.end());
     const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
         dict_filter_it->second;
+    Tuple* dict_filter_tuple = nullptr;
+    auto dict_filter_tuple_it = tuple_map.find(tuple_desc);
+    if (dict_filter_tuple_it == tuple_map.end()) {
+      auto tuple_it = dict_filter_tuple_map_.find(tuple_desc);
+      DCHECK(tuple_it != dict_filter_tuple_map_.end());
+      dict_filter_tuple = tuple_it->second;
+      dict_filter_tuple->Init(tuple_desc->byte_size());
+      tuple_map[tuple_desc] = dict_filter_tuple;
+    } else {
+      dict_filter_tuple = dict_filter_tuple_it->second;
+    }
+
+    DCHECK(dict_filter_tuple != nullptr);
     void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
@@ -956,9 +986,9 @@ Status HdfsParquetScanner::AssembleRows(
     const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
     bool* skip_row_group) {
   DCHECK(!column_readers.empty());
-  DCHECK(row_batch != NULL);
+  DCHECK(row_batch != nullptr);
   DCHECK_EQ(*skip_row_group, false);
-  DCHECK(scratch_batch_ != NULL);
+  DCHECK(scratch_batch_ != nullptr);
 
   int64_t num_rows_read = 0;
   while (!column_readers[0]->RowGroupAtEnd()) {
@@ -1013,7 +1043,7 @@ Status HdfsParquetScanner::AssembleRows(
 }
 
 Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
-  DCHECK(dst_batch != NULL);
+  DCHECK(dst_batch != nullptr);
   dst_batch->CommitRows(num_rows);
 
   if (context_->cancelled()) return Status::CANCELLED;
@@ -1053,7 +1083,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   }
 
   int num_rows_to_commit;
-  if (codegend_process_scratch_batch_fn_ != NULL) {
+  if (codegend_process_scratch_batch_fn_ != nullptr) {
     num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
   } else {
     num_rows_to_commit = ProcessScratchBatch(dst_batch);
@@ -1065,17 +1095,17 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
     const vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn) {
   DCHECK(node->runtime_state()->ShouldCodegen());
-  *process_scratch_batch_fn = NULL;
+  *process_scratch_batch_fn = nullptr;
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
-  DCHECK(codegen != NULL);
+  DCHECK(codegen != nullptr);
   SCOPED_TIMER(codegen->codegen_timer());
 
   llvm::Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
-  DCHECK(fn != NULL);
+  DCHECK(fn != nullptr);
 
   llvm::Function* eval_conjuncts_fn;
   RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn));
-  DCHECK(eval_conjuncts_fn != NULL);
+  DCHECK(eval_conjuncts_fn != nullptr);
 
   int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
   DCHECK_EQ(replaced, 1);
@@ -1083,14 +1113,14 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
   llvm::Function* eval_runtime_filters_fn;
   RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
       codegen, node->filter_exprs(), &eval_runtime_filters_fn));
-  DCHECK(eval_runtime_filters_fn != NULL);
+  DCHECK(eval_runtime_filters_fn != nullptr);
 
   replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters");
   DCHECK_EQ(replaced, 1);
 
   fn->setName("ProcessScratchBatch");
   *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
-  if (*process_scratch_batch_fn == NULL) {
+  if (*process_scratch_batch_fn == nullptr) {
     return Status("Failed to finalize process_scratch_batch_fn.");
   }
   return Status::OK();
@@ -1199,7 +1229,7 @@ bool HdfsParquetScanner::AssembleCollection(
     CollectionValueBuilder* coll_value_builder) {
   DCHECK(!column_readers.empty());
   DCHECK_GE(new_collection_rep_level, 0);
-  DCHECK(coll_value_builder != NULL);
+  DCHECK(coll_value_builder != nullptr);
 
   const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
   Tuple* template_tuple = template_tuple_map_[tuple_desc];
@@ -1215,12 +1245,12 @@ bool HdfsParquetScanner::AssembleCollection(
   // group (otherwise it would always be true because we're on the "edge" of two
   // collections), and only ProcessSplit() should call AssembleRows() at the end of the
   // row group.
-  if (coll_value_builder != NULL) DCHECK(!end_of_collection);
+  if (coll_value_builder != nullptr) DCHECK(!end_of_collection);
 
   while (!end_of_collection && continue_execution) {
     MemPool* pool;
     Tuple* tuple;
-    TupleRow* row = NULL;
+    TupleRow* row = nullptr;
 
     int64_t num_rows;
     // We're assembling item tuples into an CollectionValue
@@ -1233,7 +1263,7 @@ bool HdfsParquetScanner::AssembleCollection(
     // 'num_rows' can be very high if we're writing to a large CollectionValue. Limit
     // the number of rows we read at one time so we don't spend too long in the
     // 'num_rows' loop below before checking for cancellation or limit reached.
-    num_rows = std::min(
+    num_rows = min(
         num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
 
     int num_to_commit = 0;
@@ -1286,7 +1316,7 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
       FILE_CHECK_GE(col_reader->def_level(),
                     col_reader->def_level_of_immediate_repeated_ancestor());
       // Fill in position slot if applicable
-      if (col_reader->pos_slot_desc() != NULL) col_reader->ReadPosition(tuple);
+      if (col_reader->pos_slot_desc() != nullptr) col_reader->ReadPosition(tuple);
       continue_execution = col_reader->ReadValue(pool, tuple);
     } else {
       // A containing repeated field is empty or NULL
@@ -1351,14 +1381,14 @@ Status HdfsParquetScanner::ProcessFooter() {
   // deserializing it.
   ScopedBuffer metadata_buffer(scan_node_->mem_tracker());
 
-  DCHECK(metadata_range_ != NULL);
+  DCHECK(metadata_range_ != nullptr);
   if (UNLIKELY(metadata_size > remaining_bytes_buffered)) {
     // In this case, the metadata is bigger than our guess meaning there are
     // not enough bytes in the footer range from IssueInitialRanges().
     // We'll just issue more ranges to the IoMgr that is the actual footer.
     int64_t partition_id = context_->partition_descriptor()->id();
     const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
-    DCHECK(file_desc != NULL);
+    DCHECK(file_desc != nullptr);
     // The start of the metadata is:
     // file_length - 4-byte metadata size - footer-size - metadata size
     int64_t metadata_start = file_desc->file_length - sizeof(int32_t)
@@ -1443,7 +1473,7 @@ Status HdfsParquetScanner::ProcessFooter() {
 Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc,
     const ParquetSchemaResolver& schema_resolver,
     vector<ParquetColumnReader*>* column_readers) {
-  DCHECK(column_readers != NULL);
+  DCHECK(column_readers != nullptr);
   DCHECK(column_readers->empty());
 
   if (scan_node_->optimize_parquet_count_star()) {
@@ -1453,14 +1483,14 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
   }
 
   // Each tuple can have at most one position slot. We'll process this slot desc last.
-  SlotDescriptor* pos_slot_desc = NULL;
+  SlotDescriptor* pos_slot_desc = nullptr;
 
   for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
     // Skip partition columns
     if (&tuple_desc == scan_node_->tuple_desc() &&
         slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
 
-    SchemaNode* node = NULL;
+    SchemaNode* node = nullptr;
     bool pos_field;
     bool missing_field;
     RETURN_IF_ERROR(schema_resolver.ResolvePath(
@@ -1470,7 +1500,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
       // In this case, we are selecting a column that is not in the file.
       // Update the template tuple to put a NULL in this slot.
       Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
-      if (*template_tuple == NULL) {
+      if (*template_tuple == nullptr) {
         *template_tuple =
             Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
       }
@@ -1479,7 +1509,8 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
     }
 
     if (pos_field) {
-      DCHECK(pos_slot_desc == NULL) << "There should only be one position slot per tuple";
+      DCHECK(pos_slot_desc == nullptr)
+          << "There should only be one position slot per tuple";
       pos_slot_desc = slot_desc;
       continue;
     }
@@ -1493,7 +1524,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
 
     if (col_reader->IsCollectionReader()) {
       // Recursively populate col_reader's children
-      DCHECK(slot_desc->collection_item_descriptor() != NULL);
+      DCHECK(slot_desc->collection_item_descriptor() != nullptr);
       const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
       CollectionColumnReader* collection_reader =
           static_cast<CollectionColumnReader*>(col_reader);
@@ -1513,7 +1544,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
     column_readers->push_back(reader);
   }
 
-  if (pos_slot_desc != NULL) {
+  if (pos_slot_desc != nullptr) {
     // 'tuple_desc' has a position slot. Use an existing column reader to populate it.
     DCHECK(!column_readers->empty());
     (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
@@ -1547,14 +1578,14 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
     }
 
     *reader = ParquetColumnReader::Create(
-        *target_node, target_node->is_repeated(), NULL, this);
+        *target_node, target_node->is_repeated(), nullptr, this);
     if (target_node->is_repeated()) {
-      // Find the closest scalar descendent of 'target_node' via breadth-first search, and
+      // Find the closest scalar descendant of 'target_node' via breadth-first search, and
       // create scalar reader to drive 'reader'. We find the closest (i.e. least-nested)
-      // descendent as a heuristic for picking a descendent with fewer values, so it's
+      // descendant as a heuristic for picking a descendant with fewer values, so it's
       // faster to scan.
       // TODO: use different heuristic than least-nested? Fewest values?
-      const SchemaNode* node = NULL;
+      const SchemaNode* node = nullptr;
       queue<const SchemaNode*> nodes;
       nodes.push(target_node);
       while (!nodes.empty()) {
@@ -1563,7 +1594,7 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
         if (node->children.size() > 0) {
           for (const SchemaNode& child: node->children) nodes.push(&child);
         } else {
-          // node is the least-nested scalar descendent of 'target_node'
+          // node is the least-nested scalar descendant of 'target_node'
           break;
         }
       }
@@ -1571,52 +1602,52 @@ Status HdfsParquetScanner::CreateCountingReader(const SchemaPath& parent_path,
       CollectionColumnReader* parent_reader =
           static_cast<CollectionColumnReader*>(*reader);
       parent_reader->children()->push_back(
-          ParquetColumnReader::Create(*node, false, NULL, this));
+          ParquetColumnReader::Create(*node, false, nullptr, this));
     }
   } else {
     // Special case for a repeated scalar node. The repeated node represents both the
     // parent collection and the child item.
-    *reader = ParquetColumnReader::Create(*parent_node, false, NULL, this);
+    *reader = ParquetColumnReader::Create(*parent_node, false, nullptr, this);
   }
 
   return Status::OK();
 }
 
-Status HdfsParquetScanner::InitColumns(
-    int row_group_idx, const vector<ParquetColumnReader*>& column_readers) {
+void HdfsParquetScanner::InitCollectionColumns() {
+  for (CollectionColumnReader* col_reader: collection_readers_) {
+    col_reader->Reset();
+  }
+}
+
+Status HdfsParquetScanner::InitScalarColumns(
+    int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
-  DCHECK(file_desc != NULL);
+  DCHECK(file_desc != nullptr);
   parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
 
   // All the scan ranges (one for each column).
   vector<ScanRange*> col_ranges;
-  // Used to validate that the number of values in each reader in column_readers_ is the
-  // same.
-  int num_values = -1;
+  // Used to validate that the number of values in each reader in column_readers_ at the
+  // same SchemaElement is the same.
+  unordered_map<const parquet::SchemaElement*, int> num_values_map;
   // Used to validate we issued the right number of scan ranges
   int num_scalar_readers = 0;
 
-  for (ParquetColumnReader* col_reader: column_readers) {
-    if (col_reader->IsCollectionReader()) {
-      CollectionColumnReader* collection_reader =
-          static_cast<CollectionColumnReader*>(col_reader);
-      collection_reader->Reset();
-      // Recursively init child readers
-      RETURN_IF_ERROR(InitColumns(row_group_idx, *collection_reader->children()));
-      continue;
-    }
+  for (BaseScalarColumnReader* scalar_reader: column_readers) {
     ++num_scalar_readers;
-
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
     const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
+    auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
+    int num_values = -1;
+    if (num_values_it != num_values_map.end()) {
+      num_values = num_values_it->second;
+    } else {
+      num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
+    }
     int64_t col_start = col_chunk.meta_data.data_page_offset;
 
-    if (num_values == -1) {
-      num_values = col_chunk.meta_data.num_values;
-    } else if (col_chunk.meta_data.num_values != num_values) {
-      // TODO for 2.3: improve this error message by saying which columns are different,
+    if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
+      // TODO: improve this error message by saying which columns are different,
       // and also specify column in other error messages as appropriate
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
@@ -1672,7 +1703,7 @@ Status HdfsParquetScanner::InitColumns(
 
     // Get the stream that will be used for this column
     ScannerContext::Stream* stream = context_->AddStream(col_range);
-    DCHECK(stream != NULL);
+    DCHECK(stream != nullptr);
 
     RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
   }
@@ -1688,18 +1719,8 @@ Status HdfsParquetScanner::InitColumns(
 }
 
 Status HdfsParquetScanner::InitDictionaries(
-    const vector<ParquetColumnReader*>& column_readers) {
-  for (ParquetColumnReader* col_reader : column_readers) {
-    if (col_reader->IsCollectionReader()) {
-      CollectionColumnReader* collection_reader =
-          static_cast<CollectionColumnReader*>(col_reader);
-      // Recursively init child reader dictionaries
-      RETURN_IF_ERROR(InitDictionaries(*collection_reader->children()));
-      continue;
-    }
-
-    BaseScalarColumnReader* scalar_reader =
-        static_cast<BaseScalarColumnReader*>(col_reader);
+    const vector<BaseScalarColumnReader*>& column_readers) {
+  for (BaseScalarColumnReader* scalar_reader : column_readers) {
     RETURN_IF_ERROR(scalar_reader->InitDictionary());
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 99b5a60..2ddf0fc 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -384,8 +384,8 @@ class HdfsParquetScanner : public HdfsScanner {
 
   boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
 
-  /// Buffer to back tuples when reading parquet::Statistics.
-  ScopedBuffer min_max_tuple_buffer_;
+  /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
+  Tuple* min_max_tuple_;
 
   /// Clone of Min/max statistics conjunct evaluators. Has the same life time as
   /// the scanner. Stored in 'obj_pool_'.
@@ -416,9 +416,10 @@ class HdfsParquetScanner : public HdfsScanner {
     LocalFilterStats() : considered(0), rejected(0), total_possible(0), enabled(1) { }
   };
 
-  /// Pool used for allocating caches of definition/repetition levels that are
-  /// populated by the level readers. The pool is freed in Close().
-  boost::scoped_ptr<MemPool> level_cache_pool_;
+  /// Pool used for allocating caches of definition/repetition levels and tuples for
+  /// dictionary filtering. The definition/repetition levels are populated by the
+  /// level readers. The pool is freed in Close().
+  boost::scoped_ptr<MemPool> perm_pool_;
 
   /// Track statistics of each filter (one for each filter in filter_ctxs_) per scanner so
   /// that expensive aggregation up to the scan node can be performed once, during
@@ -448,16 +449,24 @@ class HdfsParquetScanner : public HdfsScanner {
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
 
-  /// Column readers that are eligible for dictionary filtering
-  /// These are pointers to elements of column_readers_
-  std::vector<ParquetColumnReader*> dict_filterable_readers_;
+  /// Column readers that are eligible for dictionary filtering.
+  /// These are pointers to elements of column_readers_. Materialized columns that are
+  /// dictionary encoded correspond to scalar columns that are either top-level columns
+  /// or nested within a collection. CollectionColumnReaders are not eligible for
+  /// dictionary filtering so are not included.
+  std::vector<BaseScalarColumnReader*> dict_filterable_readers_;
 
-  /// Column readers that are not eligible for dictionary filtering
-  /// These are pointers to elements of column_readers_
-  std::vector<ParquetColumnReader*> non_dict_filterable_readers_;
+  /// Column readers that are not eligible for dictionary filtering.
+  /// These are pointers to elements of column_readers_. The readers are either top-level
+  /// or nested within a collection.
+  std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
 
-  /// Memory used to store the tuple used for dictionary filtering
-  ScopedBuffer dict_filter_tuple_backing_;
+  /// Flattened collection column readers that point to readers in column_readers_.
+  std::vector<CollectionColumnReader*> collection_readers_;
+
+  /// Memory used to store the tuples used for dictionary filtering. Tuples owned by
+  /// perm_pool_.
+  std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
 
   /// Timer for materializing rows.  This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
@@ -619,14 +628,17 @@ class HdfsParquetScanner : public HdfsScanner {
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
   /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
-  /// should be the readers used to materialize a single tuple (i.e., column_readers_ or
-  /// the children of a collection node).
-  Status InitColumns(
-      int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers)
+  /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
+  /// top-level scalar columns and readers of scalar columns within a collection node).
+  Status InitScalarColumns(
+      int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
       WARN_UNUSED_RESULT;
 
+  /// Initializes the column readers in collection_readers_.
+  void InitCollectionColumns();
+
   /// Initialize dictionaries for all column readers
-  Status InitDictionaries(const std::vector<ParquetColumnReader*>& column_readers)
+  Status InitDictionaries(const std::vector<BaseScalarColumnReader*>& column_readers)
       WARN_UNUSED_RESULT;
 
   /// Performs some validation once we've reached the end of a row group to help detect
@@ -645,8 +657,19 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Evaluates whether the column reader is eligible for dictionary predicates
   bool IsDictFilterable(ParquetColumnReader* col_reader);
 
-  /// Divides the column readers into dict_filterable_readers_ and
-  /// non_dict_filterable_readers_. Allocates memory for dict_filter_tuple_backing_.
+  /// Evaluates whether the column reader is eligible for dictionary predicates.
+  bool IsDictFilterable(BaseScalarColumnReader* col_reader);
+
+  /// Partitions the readers into scalar and collection readers. The collection readers
+  /// are flattened into collection_readers_. The scalar readers are partitioned into
+  /// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
+  /// dictionary filtering is enabled and the reader can be dictionary filtered.
+  void PartitionReaders(const vector<ParquetColumnReader*>& readers,
+                        bool can_eval_dict_filters);
+
+  /// Divides the column readers into dict_filterable_readers_,
+  /// non_dict_filterable_readers_ and collection_readers_. Allocates memory for
+  /// dict_filter_tuple_map_.
   Status InitDictFilterStructures() WARN_UNUSED_RESULT;
 
   /// Returns true if all of the data pages in the column chunk are dictionary encoded

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 2b310af..70fbac2 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -393,7 +393,8 @@ class HdfsScanNodeBase : public ScanNode {
   ConjunctsMap conjuncts_map_;
   ConjunctEvaluatorsMap conjunct_evals_map_;
 
-  /// Dictionary filtering eligible conjuncts for each slot.
+  /// Dictionary filtering eligible conjuncts for each slot. Set to nullptr when there
+  /// are no dictionary filters.
   const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_;
 
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 4cafa5d..f934f79 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -83,12 +83,20 @@ Status HdfsScanner::Open(ScannerContext* context) {
   // Set up the scan node's dictionary filtering conjuncts map.
   if (scan_node_->thrift_dict_filter_conjuncts_map() != nullptr) {
     for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) {
+      SlotDescriptor* slot_desc = state_->desc_tbl().GetSlotDescriptor(entry.first);
+      TupleId tuple_id = (slot_desc->type().IsCollectionType() ?
+          slot_desc->collection_item_descriptor()->id() :
+          slot_desc->parent()->id());
+      auto conjunct_evals_it = conjunct_evals_map_.find(tuple_id);
+      DCHECK(conjunct_evals_it != conjunct_evals_map_.end());
+      const vector<ScalarExprEvaluator*>& conjunct_evals = conjunct_evals_it->second;
+
       // Convert this slot's list of conjunct indices into a list of pointers
       // into conjunct_evals_.
       for (int conjunct_idx : entry.second) {
-        DCHECK_LT(conjunct_idx, conjunct_evals_->size());
-        DCHECK((*conjunct_evals_)[conjunct_idx] != nullptr);
-        dict_filter_map_[entry.first].push_back((*conjunct_evals_)[conjunct_idx]);
+        DCHECK_LT(conjunct_idx, conjunct_evals.size());
+        DCHECK((conjunct_evals)[conjunct_idx] != nullptr);
+        dict_filter_map_[entry.first].push_back((conjunct_evals)[conjunct_idx]);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6228744..e3c186f 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -217,8 +217,9 @@ class HdfsScanner {
   // scanners that do not support nested types.
   const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr;
 
-  // Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map().
-  typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap;
+  // Clones of the conjuncts' evaluators in scan_node_->thrift_dict_filter_conjuncts().
+  typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>>
+      DictFilterConjunctsMap;
   DictFilterConjunctsMap dict_filter_map_;
 
   /// Holds memory for template tuples. The memory in this pool must remain valid as long

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 317a4a5..099fdce 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -1159,14 +1159,14 @@ Status BaseScalarColumnReader::ReadDataPage() {
     // Initialize the repetition level data
     RETURN_IF_ERROR(rep_levels_.Init(filename(),
         current_page_header_.data_page_header.repetition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        parent_->perm_pool_.get(), parent_->state_->batch_size(),
         max_rep_level(), num_buffered_values_,
         &data_, &data_size));
 
     // Initialize the definition level data
     RETURN_IF_ERROR(def_levels_.Init(filename(),
         current_page_header_.data_page_header.definition_level_encoding,
-        parent_->level_cache_pool_.get(), parent_->state_->batch_size(),
+        parent_->perm_pool_.get(), parent_->state_->batch_size(),
         max_def_level(), num_buffered_values_, &data_, &data_size));
 
     // Data can be empty if the column contains all NULLs

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index 1d20ef4..d75e94b 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -25,7 +25,7 @@
 
 namespace impala {
 
-/// Class for constructing an CollectionValue when the total size isn't known
+/// Class for constructing a CollectionValue when the total size isn't known
 /// up-front. This class handles allocating the buffer backing the collection from a
 /// MemPool, and uses a doubling strategy for growing the collection.
 class CollectionValueBuilder {

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/be/src/runtime/scoped-buffer.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/scoped-buffer.h b/be/src/runtime/scoped-buffer.h
index cf77dd6..17fb8e2 100644
--- a/be/src/runtime/scoped-buffer.h
+++ b/be/src/runtime/scoped-buffer.h
@@ -24,6 +24,7 @@ namespace impala {
 
 /// A scoped memory allocation that is tracked against a MemTracker.
 /// The allocation is automatically freed when the ScopedBuffer object goes out of scope.
+/// NOTE: if multiple allocations share the same lifetime, prefer to use MemPool.
 class ScopedBuffer {
  public:
   ScopedBuffer(MemTracker* mem_tracker) : mem_tracker_(mem_tracker),

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 97ef1b3..fedca3c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -229,8 +229,7 @@ struct THdfsScanNode {
   // Tuple to evaluate 'min_max_conjuncts' against.
   8: optional Types.TTupleId min_max_tuple_id
 
-  // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible
-  // for dictionary filtering.
+  // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 
   // The byte offset of the slot for Parquet metadata if Parquet count star optimization

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
index e303a11..4f0a0e1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotDescriptor.java
@@ -69,12 +69,16 @@ public class SlotDescriptor {
   private ColumnStats stats_;  // only set if 'column' isn't set
 
   SlotDescriptor(SlotId id, TupleDescriptor parent) {
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(parent);
     id_ = id;
     parent_ = parent;
     byteOffset_ = -1;  // invalid
   }
 
   SlotDescriptor(SlotId id, TupleDescriptor parent, SlotDescriptor src) {
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(parent);
     id_ = id;
     parent_ = parent;
     type_ = src.type_;

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
index a87cd3a..6c33861 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java
@@ -48,6 +48,8 @@ import com.google.common.collect.Lists;
  * The list of slots tracks the named slots that are actually referenced in a query, as
  * well as all anonymous slots. Although not required, a tuple descriptor typically
  * only has named or anonymous slots and not a mix of both.
+ * Each tuple and slot descriptor has an associated unique id (within the scope of a
+ * query). A given slot descriptor is owned by exactly one tuple descriptor.
  *
  * For example, every table reference has a corresponding tuple descriptor. The columns
  * of the table are represented by the tuple descriptor's type (struct type with one

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b13f435..596129b 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -100,10 +100,17 @@ import com.google.common.collect.Sets;
  * TABLESAMPLE clause. Scan predicates and the sampling are independent, so we first
  * prune partitions and then randomly select files from those partitions.
  *
- * For scans of tables with Parquet files the class creates an additional list of
- * conjuncts that are passed to the backend and will be evaluated against the
- * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups
- * will be skipped.
+ * For scans of tables with Parquet files the class sends over additional information
+ * to the backend to enable more aggressive runtime pruning. Two types of pruning are
+ * supported:
+ *
+ * 1. Min-max pruning: the class creates an additional list of conjuncts from applicable
+ * scan-node conjuncts and collection conjuncts. The additional conjuncts are
+ * used to prune a row group if any fail the row group's min-max parquet::Statistics.
+ *
+ * 2. Dictionary pruning: the class identifies which scan-node conjuncts and collection
+ * conjuncts can be used to prune a row group by evaluating conjuncts on the
+ * column dictionaries.
  *
  * Count(*) aggregation optimization flow:
  * The caller passes in an AggregateInfo to the constructor that this scan node uses to
@@ -181,11 +188,20 @@ public class HdfsScanNode extends ScanNode {
 
   // TupleDescriptors of collection slots that have an IsNotEmptyPredicate. See
   // SelectStmt#registerIsNotEmptyPredicates.
+  // Correctness for applying min-max and dictionary filters requires that the nested
+  // collection is tested to be not empty (via the IsNotEmptyPredicate).
+  // These filters are added by analysis (see: SelectStmt#registerIsNotEmptyPredicates).
+  // While correct, they may be conservative. See the tests for parquet collection
+  // filtering for examples that could benefit from being more aggressive
+  // (yet still correct).
   private final Set<TupleDescriptor> notEmptyCollections_ = Sets.newHashSet();
 
-  // Map from SlotIds to indices in PlanNodes.conjuncts_ that are eligible for
-  // dictionary filtering
-  private final Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
+  // Map from SlotDescriptor to indices in PlanNodes.conjuncts_ and
+  // collectionConjuncts_ that are eligible for dictionary filtering. Slots in the
+  // the TupleDescriptor of this scan node map to indices into PlanNodes.conjuncts_ and
+  // slots in the TupleDescriptors of nested types map to indices into
+  // collectionConjuncts_.
+  private Map<SlotDescriptor, List<Integer>> dictionaryFilterConjuncts_ =
       Maps.newLinkedHashMap();
 
   // Number of partitions that have the row count statistic.
@@ -525,10 +541,10 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Analyzes 'conjuncts_', populates 'minMaxTuple_' with slots for statistics values, and
-   * populates 'minMaxConjuncts_' with conjuncts pointing into the 'minMaxTuple_'. Only
-   * conjuncts of the form <slot> <op> <constant> are supported, and <op> must be one of
-   * LT, LE, GE, GT, or EQ.
+   * Analyzes 'conjuncts_' and 'collectionConjuncts_', populates 'minMaxTuple_' with slots
+   * for statistics values, and populates 'minMaxConjuncts_' with conjuncts pointing into
+   * the 'minMaxTuple_'. Only conjuncts of the form <slot> <op> <constant> are supported,
+   * and <op> must be one of LT, LE, GE, GT, or EQ.
    */
   private void computeMinMaxTupleAndConjuncts(Analyzer analyzer) throws ImpalaException{
     Preconditions.checkNotNull(desc_.getPath());
@@ -542,10 +558,6 @@ public class HdfsScanNode extends ScanNode {
 
     // Adds predicates for collections.
     for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
-      // Adds only predicates for collections that are filtered by an IsNotEmptyPredicate.
-      // It is assumed that analysis adds these filters such that they are correct, but
-      // potentially conservative. See the tests for examples that could benefit from
-      // being more aggressive (yet still correct).
       if (notEmptyCollections_.contains(entry.getKey())) {
         for (Expr pred: entry.getValue()) tryComputeMinMaxPredicate(analyzer, pred);
       }
@@ -606,49 +618,68 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Walks through conjuncts and populates dictionaryFilterConjuncts_.
+   * Adds an entry to dictionaryFilterConjuncts_ if dictionary filtering is applicable
+   * for conjunct. The dictionaryFilterConjuncts_ entry maps the conjunct's tupleId and
+   * slotId to conjunctIdx. The conjunctIdx is the offset into a list of conjuncts;
+   * either conjuncts_ (for scan node's tupleId) or collectionConjuncts_ (for nested
+   * collections).
    */
-  private void computeDictionaryFilterConjuncts(Analyzer analyzer) {
-    for (int conjunct_idx = 0; conjunct_idx < conjuncts_.size(); ++conjunct_idx) {
-      Expr conjunct = conjuncts_.get(conjunct_idx);
-      List<TupleId> tupleIds = Lists.newArrayList();
-      List<SlotId> slotIds = Lists.newArrayList();
-
-      conjunct.getIds(tupleIds, slotIds);
-      if (slotIds.size() == 0) continue;
-      Preconditions.checkState(tupleIds.size() == 1);
-      if (slotIds.size() != 1) continue;
-
-      // Check to see if this slot is a collection type. Nested types are
-      // currently not supported. For example, an IsNotEmptyPredicate cannot
-      // be evaluated at the dictionary level.
-      if (analyzer.getSlotDesc(slotIds.get(0)).getType().isCollectionType()) continue;
-
-      // Check to see if this conjunct contains any known randomized function
-      if (conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) continue;
-
-      // Check to see if the conjunct evaluates to true when the slot is NULL
-      // This is important for dictionary filtering. Dictionaries do not
-      // contain an entry for NULL and do not provide an indication about
-      // whether NULLs are present. A conjunct that evaluates to true on NULL
-      // cannot be evaluated purely on the dictionary.
-      try {
-        if (analyzer.isTrueWithNullSlots(conjunct)) continue;
-      } catch (InternalException e) {
-        // Expr evaluation failed in the backend. Skip this conjunct since we cannot
-        // determine whether it is safe to apply it against a dictionary.
-        LOG.warn("Skipping dictionary filter because backend evaluation failed: "
-            + conjunct.toSql(), e);
-        continue;
-      }
+  private void addDictionaryFilter(Analyzer analyzer, Expr conjunct, int conjunctIdx) {
+    List<TupleId> tupleIds = Lists.newArrayList();
+    List<SlotId> slotIds = Lists.newArrayList();
+    conjunct.getIds(tupleIds, slotIds);
+    // Only single-slot conjuncts are eligible for dictionary filtering. When pruning
+    // a row-group, the conjunct must be evaluated only against a single row-group
+    // at-a-time. Expect a single slot conjunct to be associated with a single tuple-id.
+    if (slotIds.size() != 1) return;
+
+    // Check to see if this slot is a collection type. Dictionary pruning is applicable
+    // to scalar values nested in collection types, not enclosing collection types.
+    if (analyzer.getSlotDesc(slotIds.get(0)).getType().isCollectionType()) return;
+
+    // Check to see if this conjunct contains any known randomized function
+    if (conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) return;
+
+    // Check to see if the conjunct evaluates to true when the slot is NULL
+    // This is important for dictionary filtering. Dictionaries do not
+    // contain an entry for NULL and do not provide an indication about
+    // whether NULLs are present. A conjunct that evaluates to true on NULL
+    // cannot be evaluated purely on the dictionary.
+    try {
+      if (analyzer.isTrueWithNullSlots(conjunct)) return;
+    } catch (InternalException e) {
+      // Expr evaluation failed in the backend. Skip this conjunct since we cannot
+      // determine whether it is safe to apply it against a dictionary.
+      LOG.warn("Skipping dictionary filter because backend evaluation failed: "
+          + conjunct.toSql(), e);
+      return;
+    }
 
-      // TODO: Should there be a limit on the cost/structure of the conjunct?
-      Integer slotIdInt = slotIds.get(0).asInt();
-      if (dictionaryFilterConjuncts_.containsKey(slotIdInt)) {
-        dictionaryFilterConjuncts_.get(slotIdInt).add(conjunct_idx);
-      } else {
-        List<Integer> slotList = Lists.newArrayList(conjunct_idx);
-        dictionaryFilterConjuncts_.put(slotIdInt, slotList);
+    // TODO: Should there be a limit on the cost/structure of the conjunct?
+    SlotId slotId = slotIds.get(0);
+    SlotDescriptor slotKey = analyzer.getSlotDesc(slotId);
+    List<Integer> slotList = dictionaryFilterConjuncts_.get(slotKey);
+    if (slotList == null) {
+      slotList = Lists.newArrayList();
+      dictionaryFilterConjuncts_.put(slotKey, slotList);
+    }
+    slotList.add(conjunctIdx);
+  }
+
+  /**
+   * Walks through conjuncts_ and collectionConjuncts_ and populates
+   * dictionaryFilterConjuncts_.
+   */
+  private void computeDictionaryFilterConjuncts(Analyzer analyzer) {
+    for (int conjunctIdx = 0; conjunctIdx < conjuncts_.size(); ++conjunctIdx) {
+      addDictionaryFilter(analyzer, conjuncts_.get(conjunctIdx), conjunctIdx);
+    }
+    for (Map.Entry<TupleDescriptor, List<Expr>> entry: collectionConjuncts_.entrySet()) {
+      if (notEmptyCollections_.contains(entry.getKey())) {
+        List<Expr> conjuncts = entry.getValue();
+        for (int conjunctIdx = 0; conjunctIdx < conjuncts.size(); ++conjunctIdx) {
+          addDictionaryFilter(analyzer, conjuncts.get(conjunctIdx), conjunctIdx);
+        }
       }
     }
   }
@@ -986,7 +1017,12 @@ public class HdfsScanNode extends ScanNode {
       }
       msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
     }
-    msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictionaryFilterConjuncts_);
+    Map<Integer, List<Integer>> dictMap = Maps.newLinkedHashMap();
+    for (Map.Entry<SlotDescriptor, List<Integer>> entry :
+      dictionaryFilterConjuncts_.entrySet()) {
+      dictMap.put(entry.getKey().getId().asInt(), entry.getValue());
+    }
+    msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictMap);
   }
 
   @Override
@@ -1008,8 +1044,8 @@ public class HdfsScanNode extends ScanNode {
           PrintUtils.printBytes(totalBytes_)));
       output.append("\n");
       if (!conjuncts_.isEmpty()) {
-        output.append(
-            detailPrefix + "predicates: " + getExplainString(conjuncts_) + "\n");
+        output.append(String.format("%spredicates: %s\n", detailPrefix,
+            getExplainString(conjuncts_)));
       }
       if (!collectionConjuncts_.isEmpty()) {
         for (Map.Entry<TupleDescriptor, List<Expr>> entry:
@@ -1042,23 +1078,54 @@ public class HdfsScanNode extends ScanNode {
             totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
       }
       if (!minMaxOriginalConjuncts_.isEmpty()) {
-        output.append(detailPrefix + "parquet statistics predicates: " +
-            getExplainString(minMaxOriginalConjuncts_) + "\n");
+        output.append(String.format("%sparquet statistics predicates: %s\n",
+            detailPrefix, getExplainString(minMaxOriginalConjuncts_)));
       }
-      if (!dictionaryFilterConjuncts_.isEmpty()) {
-        List<Integer> totalIdxList = Lists.newArrayList();
-        for (List<Integer> idxList : dictionaryFilterConjuncts_.values()) {
-          totalIdxList.addAll(idxList);
-        }
-        // Since the conjuncts are stored by the slot id, they are not necessarily
-        // in the same order as the normal conjuncts. Sort the indices so that the
-        // order matches the normal conjuncts.
-        Collections.sort(totalIdxList);
-        List<Expr> exprList = Lists.newArrayList();
-        for (Integer idx : totalIdxList) exprList.add(conjuncts_.get(idx));
-        output.append(String.format("%sparquet dictionary predicates: %s\n",
-            detailPrefix, getExplainString(exprList)));
+      // Groups the dictionary filterable conjuncts by tuple descriptor.
+      output.append(getDictionaryConjunctsExplainString(detailPrefix));
+    }
+    return output.toString();
+  }
+
+  // Helper method that prints the dictionary filterable conjuncts by tuple descriptor.
+  private String getDictionaryConjunctsExplainString(String prefix) {
+    StringBuilder output = new StringBuilder();
+    Map<TupleDescriptor, List<Integer>> perTupleConjuncts = Maps.newLinkedHashMap();
+    for (Map.Entry<SlotDescriptor, List<Integer>> entry :
+      dictionaryFilterConjuncts_.entrySet()) {
+      SlotDescriptor slotDescriptor = entry.getKey();
+      TupleDescriptor tupleDescriptor = slotDescriptor.getParent();
+      List<Integer> indexes = perTupleConjuncts.get(tupleDescriptor);
+      if (indexes == null) {
+        indexes = Lists.newArrayList();
+        perTupleConjuncts.put(tupleDescriptor, indexes);
+      }
+      indexes.addAll(entry.getValue());
+    }
+    for (Map.Entry<TupleDescriptor, List<Integer>> entry :
+      perTupleConjuncts.entrySet()) {
+      List<Integer> totalIdxList = entry.getValue();
+      // Since the conjuncts are stored by the slot id, they are not necessarily
+      // in the same order as the normal conjuncts. Sort the indices so that the
+      // order matches the normal conjuncts.
+      Collections.sort(totalIdxList);
+      List<Expr> conjuncts;
+      TupleDescriptor tupleDescriptor = entry.getKey();
+      String tupleName = "";
+      if (tupleDescriptor == getTupleDesc()) {
+        conjuncts = conjuncts_;
+      } else {
+        conjuncts = collectionConjuncts_.get(tupleDescriptor);
+        tupleName = " on " + tupleDescriptor.getAlias();
+      }
+      Preconditions.checkNotNull(conjuncts);
+      List<Expr> exprList = Lists.newArrayList();
+      for (Integer idx : totalIdxList) {
+        Preconditions.checkState(idx.intValue() < conjuncts.size());
+        exprList.add(conjuncts.get(idx));
       }
+      output.append(String.format("%sparquet dictionary predicates%s: %s\n",
+          prefix, tupleName, getExplainString(exprList)));
     }
     return output.toString();
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/CustomerMultiBlock/README
----------------------------------------------------------------------
diff --git a/testdata/CustomerMultiBlock/README b/testdata/CustomerMultiBlock/README
new file mode 100644
index 0000000..edc02e7
--- /dev/null
+++ b/testdata/CustomerMultiBlock/README
@@ -0,0 +1,12 @@
+This file is created to test IMPALA-4993. The file contains a subset
+of tpch_nested_parquet.customer, but written out using multiple row
+groups. The file was created by following the instructions in
+testdata/bin/load_nested.py to create the table, tmp_customer, which
+is then written out in parquet format using hive:
+
+SET parquet.block.size=8192;
+
+CREATE TABLE customer
+STORED AS PARQUET
+TBLPROPERTIES('parquet.compression'='SNAPPY')
+AS SELECT * FROM tmp_customer where c_custkey < 200;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/CustomerMultiBlock/customer_multiblock.parquet
----------------------------------------------------------------------
diff --git a/testdata/CustomerMultiBlock/customer_multiblock.parquet b/testdata/CustomerMultiBlock/customer_multiblock.parquet
new file mode 100644
index 0000000..5a14cc8
Binary files /dev/null and b/testdata/CustomerMultiBlock/customer_multiblock.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/datasets/functional/functional_schema_template.sql
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3fcb5f7..f4b6b34 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2051,6 +2051,25 @@ L_SHIPMODE STRING
 L_COMMENT STRING
 ====
 ---- DATASET
+-- IMPALA-4933: tests nested collections stored in multiple row-groups.
+---- BASE_TABLE_NAME
+customer_multiblock
+---- COLUMNS
+C_CUSTKEY BIGINT
+C_NAME STRING
+C_ADDRESS STRING
+C_NATIONKEY SMALLINT
+C_PHONE STRING
+C_ACCTBAL DECIMAL(12, 2)
+C_MKTSEGMENT STRING
+C_COMMENT STRING
+C_ORDERS ARRAY<STRUCT<O_ORDERKEY: BIGINT, O_ORDERSTATUS: STRING, O_TOTALPRICE: DECIMAL(12, 2), O_ORDERDATE: STRING, O_ORDERPRIORITY: STRING, O_CLERK: STRING, O_SHIPPRIORITY: INT, O_COMMENT: STRING, O_LINEITEMS: ARRAY<STRUCT<L_PARTKEY: BIGINT, L_SUPPKEY: BIGINT, L_LINENUMBER: INT, L_QUANTITY: DECIMAL(12, 2), L_EXTENDEDPRICE: DECIMAL(12, 2), L_DISCOUNT: DECIMAL(12, 2), L_TAX: DECIMAL(12, 2), L_RETURNFLAG: STRING, L_LINESTATUS: STRING, L_SHIPDATE: STRING, L_COMMITDATE: STRING, L_RECEIPTDATE: STRING, L_SHIPINSTRUCT: STRING, L_SHIPMODE: STRING, L_COMMENT: STRING>>>>
+---- DEPENDENT_LOAD
+`hadoop fs -mkdir -p /test-warehouse/customer_multiblock_parquet && \
+hadoop fs -put -f ${IMPALA_HOME}/testdata/CustomerMultiBlock/customer_multiblock.parquet \
+/test-warehouse/customer_multiblock_parquet/
+====
+---- DATASET
 functional
 ---- BASE_TABLE_NAME
 bzip2_tbl

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/datasets/functional/schema_constraints.csv
----------------------------------------------------------------------
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index a3566c4..355e9b1 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -52,6 +52,7 @@ table_name:bad_column_metadata, constraint:restrict_to, table_format:parquet/non
 table_name:lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_multiblock_one_row_group, constraint:restrict_to, table_format:parquet/none/none
+table_name:customer_multiblock, constraint:restrict_to, table_format:parquet/none/none
 
 # TODO: Support Avro. Data loading currently fails for Avro because complex types
 # cannot be converted to the corresponding Avro types yet.

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 031ac1c..2b2d5ef 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -56,6 +56,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey > 10, o_orderkey = 4
    parquet dictionary predicates: c_custkey > 10
+   parquet dictionary predicates on o: o_orderkey = 4
+   parquet dictionary predicates on o_lineitems: 20 + l_linenumber < 0
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 4258772..f3a46de 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -253,6 +253,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
+   parquet dictionary predicates on o: o_orderkey < 5
+   parquet dictionary predicates on o_lineitems: l_linenumber < 3
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ---- PARALLELPLANS
@@ -314,6 +316,8 @@ Per-Host Resources: mem-estimate=264.00MB mem-reservation=0B
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey < 10, o_orderkey < 5, l_linenumber < 3
    parquet dictionary predicates: c_custkey < 10
+   parquet dictionary predicates on o: o_orderkey < 5
+   parquet dictionary predicates on o_lineitems: l_linenumber < 3
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=254B cardinality=15000
 ====
@@ -365,6 +369,7 @@ PLAN-ROOT SINK
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
    parquet statistics predicates: o1.o_orderkey < 5
+   parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ---- PARALLELPLANS
@@ -417,6 +422,7 @@ Per-Host Resources: mem-estimate=269.81MB mem-reservation=5.81MB
      columns missing stats: c_orders, c_orders
    extrapolated-rows=disabled
    parquet statistics predicates: o1.o_orderkey < 5
+   parquet dictionary predicates on o1: o1.o_orderkey < 5
    mem-estimate=88.00MB mem-reservation=0B
    tuple-ids=0 row-size=270B cardinality=150000
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 66770cd..e7dee4e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -151,6 +151,7 @@ PLAN-ROOT SINK
      columns missing stats: id
    extrapolated-rows=disabled
    parquet statistics predicates: a.item.e < -10
+   parquet dictionary predicates on a: a.item.e < -10
    mem-estimate=32.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=unavailable
 ====
@@ -328,6 +329,8 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled
    parquet statistics predicates: c_custkey > 0, o.o_orderkey > 0, l.l_partkey > 0
    parquet dictionary predicates: c_custkey > 0
+   parquet dictionary predicates on o: o.o_orderkey > 0
+   parquet dictionary predicates on l: l.l_partkey > 0
    mem-estimate=176.00MB mem-reservation=0B
    tuple-ids=0 row-size=24B cardinality=15000
 ====
@@ -374,3 +377,66 @@ PLAN-ROOT SINK
    mem-estimate=16.00MB mem-reservation=0B
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
+# Multiple nested collection values (at the same nesting level) where dictionary
+# pruning is applicable.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_shipdate = '1994-08-19' and
+l.l_receiptdate = '1994-08-24' and l.l_shipmode = 'RAIL' and l.l_returnflag = 'R' and
+l.l_comment is null;
+---- PLAN
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=176.00MB mem-reservation=0B
+PLAN-ROOT SINK
+|  mem-estimate=0B mem-reservation=0B
+|
+01:SUBPLAN
+|  mem-estimate=0B mem-reservation=0B
+|  tuple-ids=2,1,0 row-size=162B cardinality=15000000
+|
+|--08:NESTED LOOP JOIN [CROSS JOIN]
+|  |  mem-estimate=50B mem-reservation=0B
+|  |  tuple-ids=2,1,0 row-size=162B cardinality=100
+|  |
+|  |--02:SINGULAR ROW SRC
+|  |     parent-subplan=01
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=0 row-size=50B cardinality=1
+|  |
+|  04:SUBPLAN
+|  |  mem-estimate=0B mem-reservation=0B
+|  |  tuple-ids=2,1 row-size=112B cardinality=100
+|  |
+|  |--07:NESTED LOOP JOIN [CROSS JOIN]
+|  |  |  mem-estimate=32B mem-reservation=0B
+|  |  |  tuple-ids=2,1 row-size=112B cardinality=10
+|  |  |
+|  |  |--05:SINGULAR ROW SRC
+|  |  |     parent-subplan=04
+|  |  |     mem-estimate=0B mem-reservation=0B
+|  |  |     tuple-ids=1 row-size=32B cardinality=1
+|  |  |
+|  |  06:UNNEST [o.o_lineitems l]
+|  |     parent-subplan=04
+|  |     mem-estimate=0B mem-reservation=0B
+|  |     tuple-ids=2 row-size=0B cardinality=10
+|  |
+|  03:UNNEST [c.c_orders o]
+|     parent-subplan=01
+|     mem-estimate=0B mem-reservation=0B
+|     tuple-ids=1 row-size=0B cardinality=10
+|
+00:SCAN HDFS [tpch_nested_parquet.customer c]
+   partitions=1/1 files=4 size=292.36MB
+   predicates: !empty(c.c_orders)
+   predicates on o: !empty(o.o_lineitems)
+   predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R', l.l_comment IS NULL
+   stored statistics:
+     table: rows=150000 size=292.36MB
+     columns missing stats: c_orders
+   extrapolated-rows=disabled
+   parquet statistics predicates: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
+   parquet dictionary predicates on l: l.l_shipdate = '1994-08-19', l.l_receiptdate = '1994-08-24', l.l_shipmode = 'RAIL', l.l_returnflag = 'R'
+   mem-estimate=176.00MB mem-reservation=0B
+   tuple-ids=0 row-size=50B cardinality=150000
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/db98dc65/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
index f90e660..7a4876f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-filtering.test
@@ -1,7 +1,4 @@
-# This tests parquet dictionary filtering. It is mirrored with mt_dop
-# in mt-dop-parquet-filtering.test. Since the two rely on counting
-# the number of row groups filtered, differing parallelism changes
-# the counts seen in the output.
+# This tests parquet dictionary filtering.
 ====
 ---- QUERY
 # id: All values pass
@@ -258,4 +255,234 @@ select count(*) from functional_parquet.alltypes where mod(id,10000) = 7301;
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 24
 aggregation(SUM, NumDictFilteredRowGroups): 0
-====
\ No newline at end of file
+====
+---- QUERY
+# Nested dictionary filtering.
+#
+# Test coverage includes the following dimensions:
+# - nested collection type: map, array, struct
+# - number of filters and their nesting depth
+# - number of projections and their nesting depth
+# - required vs. non-required collections (outer vs. inner joins)
+# - filter matches: some, none
+# - count(*) optimization (exercises special code path)
+# - multiple row-groups per file
+#
+# Map key at depth 1. All required. No matches. Only one row-group is dictionary filtered
+# since only one (of two) row-groups is dictionary encoded for that column.
+select id from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k5'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+====
+---- QUERY
+select count(*) from functional_parquet.complextypestbl.int_map m where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+====
+---- QUERY
+# Map key at depth 1. All required. Matches.
+select id from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k1'
+---- RESULTS
+8
+1
+2
+7
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. No matches.
+select id from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k5'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. Matches.
+select id from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k1'
+---- RESULTS
+8
+1
+2
+7
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. All required. No matches. count(*).
+select count(*) from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 1
+---- QUERY
+# Map key at depth 1. All required. Matches. count(*)
+select count(*) from functional_parquet.complextypestbl f, f.int_map m where m.key = 'k1'
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. No matches. count(*)
+select count(*) from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k5'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Map key at depth 1. None required. Matches. count(*)
+select count(*) from functional_parquet.complextypestbl f left outer join f.int_map m
+where m.key = 'k1'
+---- RESULTS
+4
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 2. All required. Matches.
+select r_name from tpch_nested_parquet.region r, r.r_nations n where n.n_name = 'FRANCE'
+---- RESULTS
+'EUROPE'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 1
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. All required. No matches.
+select c_name from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 3. All required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 3. Bottom not required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o left outer join o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. Top not required. No matches.
+# Multiple nested values projected.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c left outer join c.c_orders o,
+o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Array struct value at depth 3. All required. Multiple filters, multiple projections.
+# Matches.
+select c_name, o.o_clerk from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_shipdate = '1994-08-19' and
+l.l_receiptdate = '1994-08-24' and l.l_shipmode = 'RAIL' and l.l_returnflag = 'R'
+---- RESULTS
+'Customer#000013873','Clerk#000000554'
+'Customer#000049757','Clerk#000000156'
+'Customer#000037490','Clerk#000000026'
+'Customer#000002836','Clerk#000000577'
+'Customer#000004897','Clerk#000000112'
+'Customer#000107891','Clerk#000000576'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+---- QUERY
+# Array struct value at depth 3. All required. No matches, count(*).
+select count(*) from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo'
+---- RESULTS
+0
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct values at depths 2 and 3. All required.
+# Matches, multiple nested values projected.
+select c_name, o.o_comment from tpch_nested_parquet.customer c,
+c.c_orders o, o.o_lineitems l
+where l.l_returnflag = 'foo' and o.o_clerk = 'foo'
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 4
+====
+---- QUERY
+# Array struct value at depth 2. Not required. No match.
+# Illustrates a case that should not be pruned.
+select count(*) from tpch_nested_parquet.customer c left outer join
+(select * from c.c_orders o where o.o_orderstatus = 'foo') v
+---- RESULTS
+150000
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 4
+aggregation(SUM, NumDictFilteredRowGroups): 0
+====
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups.
+# Expect all results to be filtered.
+select l.l_linenumber from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber + 1 < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 2
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups and count(*).
+# Expect all results to be filtered.
+select count(*) from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber + 1 < 0;
+---- RESULTS
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 2
+---- QUERY
+# Tests dictionary filtering with files with multiple row-groups and count(*).
+# Expect same result as obtained with dictionary filtering disabled.
+select count(*) from functional_parquet.customer_multiblock c,
+c.c_orders o, o.o_lineitems l
+where l.l_linenumber > 0;
+---- RESULTS
+7786
+---- RUNTIME_PROFILE
+aggregation(SUM, NumRowGroups): 2
+aggregation(SUM, NumDictFilteredRowGroups): 0


[5/5] impala git commit: IMPALA-5310: [DOCS] Reserve 'repeatable' keyword from TABLESAMPLE clause

Posted by jr...@apache.org.
IMPALA-5310: [DOCS] Reserve 'repeatable' keyword from TABLESAMPLE clause

Overlooked the new keyword when the clause was
originally introduced.

Change-Id: Ie8e6713fb97ced279f0aedfe8f42c09a7e6edae9
Reviewed-on: http://gerrit.cloudera.org:8080/9066
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/4afabd4e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/4afabd4e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/4afabd4e

Branch: refs/heads/master
Commit: 4afabd4e317ae368f5cb2bd2dd1a36ddafc90e0a
Parents: db98dc6
Author: John Russell <jr...@cloudera.com>
Authored: Thu Jan 18 14:25:16 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 21:26:12 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_reserved_words.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/4afabd4e/docs/topics/impala_reserved_words.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_reserved_words.xml b/docs/topics/impala_reserved_words.xml
index 5c83d6d..0a44dba 100644
--- a/docs/topics/impala_reserved_words.xml
+++ b/docs/topics/impala_reserved_words.xml
@@ -196,6 +196,7 @@ real
 refresh
 regexp
 rename
+<ph rev="IMPALA-5309">repeatable</ph>
 replace
 <ph rev="2.3.0">restrict</ph>
 returns


[2/5] impala git commit: IMPALA-4886: Expose table metrics in the catalog web UI.

Posted by jr...@apache.org.
IMPALA-4886: Expose table metrics in the catalog web UI.

The following changes are included in this commit:
* Adds a lightweight framework for registering metrics in the JVM.
* Adds table-level metrics and enables these metrics to be exposed
through the catalog web UI.
* Adds a CatalogUsageMonitor class that monitors and reports the catalog
usage in terms of the tables with the highest memory requirements and
the tables with the highest number of metadata operations. The catalog
usage information is exposed in the /catalog page of the catalog web UI.

Change-Id: I37d407979e6d3b1a444b6b6265900b148facde9e
Reviewed-on: http://gerrit.cloudera.org:8080/8529
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f00d10e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f00d10e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f00d10e

Branch: refs/heads/master
Commit: 3f00d10e1b6c5785990c1a73835f82e3821f839e
Parents: d8ae880
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Oct 12 16:27:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 09:25:01 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 115 +++++++-
 be/src/catalog/catalog-server.h                 |  29 ++
 be/src/catalog/catalog.cc                       |  16 ++
 be/src/catalog/catalog.h                        |  11 +
 common/thrift/CatalogObjects.thrift             |  11 +
 common/thrift/Frontend.thrift                   |  11 +
 common/thrift/JniCatalog.thrift                 |  22 ++
 fe/pom.xml                                      |   6 +
 .../java/org/apache/impala/catalog/Catalog.java |   7 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  62 ++++-
 .../impala/catalog/CatalogUsageMonitor.java     |  72 +++++
 .../org/apache/impala/catalog/HBaseTable.java   |   6 +
 .../apache/impala/catalog/HdfsPartition.java    |   8 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 262 ++++++++++++++-----
 .../org/apache/impala/catalog/KuduTable.java    |  51 ++--
 .../java/org/apache/impala/catalog/Table.java   |  59 +++++
 .../java/org/apache/impala/common/Metrics.java  | 149 +++++++++++
 .../impala/service/CatalogOpExecutor.java       |  14 +-
 .../org/apache/impala/service/JniCatalog.java   |  19 ++
 .../java/org/apache/impala/util/TopNCache.java  | 108 ++++++++
 .../org/apache/impala/util/TestTopNCache.java   | 130 +++++++++
 tests/webserver/test_web_pages.py               |   9 +
 www/catalog.tmpl                                | 117 ++++++++-
 www/table_metrics.tmpl                          |  23 ++
 24 files changed, 1205 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b004b22..4bf26c0 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -55,6 +55,8 @@ const string CATALOG_WEB_PAGE = "/catalog";
 const string CATALOG_TEMPLATE = "catalog.tmpl";
 const string CATALOG_OBJECT_WEB_PAGE = "/catalog_object";
 const string CATALOG_OBJECT_TEMPLATE = "catalog_object.tmpl";
+const string TABLE_METRICS_WEB_PAGE = "/table_metrics";
+const string TABLE_METRICS_TEMPLATE = "table_metrics.tmpl";
 
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
@@ -200,16 +202,14 @@ Status CatalogServer::Start() {
 }
 
 void CatalogServer::RegisterWebpages(Webserver* webserver) {
-  Webserver::UrlCallback catalog_callback =
-      bind<void>(mem_fn(&CatalogServer::CatalogUrlCallback), this, _1, _2);
   webserver->RegisterUrlCallback(CATALOG_WEB_PAGE, CATALOG_TEMPLATE,
-      catalog_callback);
-
-  Webserver::UrlCallback catalog_objects_callback =
-      bind<void>(mem_fn(&CatalogServer::CatalogObjectsUrlCallback), this, _1, _2);
+      [this](const auto& args, auto* doc) { this->CatalogUrlCallback(args, doc); });
   webserver->RegisterUrlCallback(CATALOG_OBJECT_WEB_PAGE, CATALOG_OBJECT_TEMPLATE,
-      catalog_objects_callback, false);
-
+      [this](const auto& args, auto* doc) { this->CatalogObjectsUrlCallback(args, doc); },
+      false);
+  webserver->RegisterUrlCallback(TABLE_METRICS_WEB_PAGE, TABLE_METRICS_TEMPLATE,
+      [this](const auto& args, auto* doc) { this->TableMetricsUrlCallback(args, doc); },
+      false);
   RegisterLogLevelCallbacks(webserver, true);
 }
 
@@ -335,11 +335,12 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
+  GetCatalogUsage(document);
   TGetDbsResult get_dbs_result;
   Status status = catalog_->GetDbs(NULL, &get_dbs_result);
   if (!status.ok()) {
     Value error(status.GetDetail().c_str(), document->GetAllocator());
-      document->AddMember("error", error, document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
     return;
   }
   Value databases(kArrayType);
@@ -364,15 +365,76 @@ void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
       table_obj.AddMember("fqtn", fq_name, document->GetAllocator());
       Value table_name(table.c_str(), document->GetAllocator());
       table_obj.AddMember("name", table_name, document->GetAllocator());
+      Value has_metrics;
+      has_metrics.SetBool(true);
+      table_obj.AddMember("has_metrics", has_metrics, document->GetAllocator());
       table_array.PushBack(table_obj, document->GetAllocator());
     }
     database.AddMember("num_tables", table_array.Size(), document->GetAllocator());
     database.AddMember("tables", table_array, document->GetAllocator());
+    Value has_metrics;
+    has_metrics.SetBool(true);
+    database.AddMember("has_metrics", has_metrics, document->GetAllocator());
     databases.PushBack(database, document->GetAllocator());
   }
   document->AddMember("databases", databases, document->GetAllocator());
 }
 
+void CatalogServer::GetCatalogUsage(Document* document) {
+  TGetCatalogUsageResponse catalog_usage_result;
+  Status status = catalog_->GetCatalogUsage(&catalog_usage_result);
+  if (!status.ok()) {
+    Value error(status.GetDetail().c_str(), document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
+    return;
+  }
+  // Collect information about the largest tables in terms of memory requirements
+  Value large_tables(kArrayType);
+  for (int i = 0; i < catalog_usage_result.large_tables.size(); ++i) {
+    Value tbl_obj(kObjectType);
+    const auto& large_table = catalog_usage_result.large_tables[i];
+    Value tbl_name(Substitute("$0.$1", large_table.table_name.db_name,
+        large_table.table_name.table_name).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("name", tbl_name, document->GetAllocator());
+    DCHECK(large_table.__isset.memory_estimate_bytes);
+    Value memory_estimate(PrettyPrinter::Print(large_table.memory_estimate_bytes,
+        TUnit::BYTES).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("mem_estimate", memory_estimate, document->GetAllocator());
+    large_tables.PushBack(tbl_obj, document->GetAllocator());
+  }
+  Value has_large_tables;
+  has_large_tables.SetBool(true);
+  document->AddMember("has_large_tables", has_large_tables, document->GetAllocator());
+  document->AddMember("large_tables", large_tables, document->GetAllocator());
+  Value num_large_tables;
+  num_large_tables.SetInt(catalog_usage_result.large_tables.size());
+  document->AddMember("num_large_tables", num_large_tables, document->GetAllocator());
+
+  // Collect information about the most frequently accessed tables.
+  Value frequent_tables(kArrayType);
+  for (int i = 0; i < catalog_usage_result.frequently_accessed_tables.size(); ++i) {
+    Value tbl_obj(kObjectType);
+    const auto& frequent_table = catalog_usage_result.frequently_accessed_tables[i];
+    Value tbl_name(Substitute("$0.$1", frequent_table.table_name.db_name,
+        frequent_table.table_name.table_name).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("name", tbl_name, document->GetAllocator());
+    Value num_metadata_operations;
+    DCHECK(frequent_table.__isset.num_metadata_operations);
+    num_metadata_operations.SetInt64(frequent_table.num_metadata_operations);
+    tbl_obj.AddMember("num_metadata_ops", num_metadata_operations,
+        document->GetAllocator());
+    frequent_tables.PushBack(tbl_obj, document->GetAllocator());
+  }
+  Value has_frequent_tables;
+  has_frequent_tables.SetBool(true);
+  document->AddMember("has_frequent_tables", has_frequent_tables,
+      document->GetAllocator());
+  document->AddMember("frequent_tables", frequent_tables, document->GetAllocator());
+  Value num_frequent_tables;
+  num_frequent_tables.SetInt(catalog_usage_result.frequently_accessed_tables.size());
+  document->AddMember("num_frequent_tables", num_frequent_tables,
+      document->GetAllocator());
+}
 
 void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
@@ -384,7 +446,8 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args
 
     // Get the object type and name from the topic entry key
     TCatalogObject request;
-    Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
+    Status status =
+        TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
 
     // Get the object and dump its contents.
     TCatalogObject result;
@@ -402,3 +465,35 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args
     document->AddMember("error", error, document->GetAllocator());
   }
 }
+
+void CatalogServer::TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
+    Document* document) {
+  // TODO: Enable json view of table metrics
+  Webserver::ArgumentMap::const_iterator object_name_arg = args.find("name");
+  if (object_name_arg != args.end()) {
+    // Parse the object name to extract database and table names
+    const string& full_tbl_name = object_name_arg->second;
+    int pos = full_tbl_name.find(".");
+    if (pos == string::npos || pos >= full_tbl_name.size() - 1) {
+      stringstream error_msg;
+      error_msg << "Invalid table name: " << full_tbl_name;
+      Value error(error_msg.str().c_str(), document->GetAllocator());
+      document->AddMember("error", error, document->GetAllocator());
+      return;
+    }
+    string metrics;
+    Status status = catalog_->GetTableMetrics(
+        full_tbl_name.substr(0, pos), full_tbl_name.substr(pos + 1), &metrics);
+    if (status.ok()) {
+      Value metrics_str(metrics.c_str(), document->GetAllocator());
+      document->AddMember("table_metrics", metrics_str, document->GetAllocator());
+    } else {
+      Value error(status.GetDetail().c_str(), document->GetAllocator());
+      document->AddMember("error", error, document->GetAllocator());
+    }
+  } else {
+    Value error("Please specify the value of the name parameter.",
+        document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 78a3f20..0b6b220 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -185,6 +185,35 @@ class CatalogServer {
   /// <host>:25020/catalog_objects?object_type=TABLE&object_name=foo.bar
   void CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
+
+  /// Retrieves from the FE information about the current catalog usage and populates
+  /// the /catalog debug webpage. The catalog usage includes information about the TOP-N
+  /// frequently used (in terms of number of metadata operations) tables as well as the
+  /// TOP-N tables with the highest memory requirements.
+  ///
+  /// Example output:
+  /// "large_tables": [
+  ///     {
+  ///       "name": "functional.alltypesagg",
+  ///       "mem_estimate": 212434233
+  ///     }
+  ///  ]
+  ///  "frequent_tables": [
+  ///      {
+  ///        "name": "functional.alltypestiny",
+  ///        "frequency": 10
+  ///      }
+  ///  ]
+  void GetCatalogUsage(rapidjson::Document* document);
+
+  /// Debug webpage handler that is used to dump all the registered metrics of a
+  /// table. The caller specifies the "name" parameter which is the fully
+  /// qualified table name and this function retrieves all the metrics of that
+  /// table. For example, to get the table metrics of table "bar" in database
+  /// "foo":
+  /// <host>:25020/table_metrics?name=foo.bar
+  void TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
+      rapidjson::Document* document);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index b6dd86a..d96d23e 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -58,11 +58,13 @@ Catalog::Catalog() {
     {"execDdl", "([B)[B", &exec_ddl_id_},
     {"resetMetadata", "([B)[B", &reset_metadata_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
+    {"getTableMetrics", "([B)Ljava/lang/String;", &get_table_metrics_id_},
     {"getDbs", "([B)[B", &get_dbs_id_},
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
+    {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -131,6 +133,20 @@ Status Catalog::GetTableNames(const string& db, const string* pattern,
   return JniUtil::CallJniMethod(catalog_, get_table_names_id_, params, table_names);
 }
 
+Status Catalog::GetTableMetrics(const string& db, const string& tbl,
+    string* table_metrics) {
+  TGetTableMetricsParams params;
+  TTableName tblName;
+  tblName.__set_db_name(db);
+  tblName.__set_table_name(tbl);
+  params.__set_table_name(tblName);
+  return JniUtil::CallJniMethod(catalog_, get_table_metrics_id_, params, table_metrics);
+}
+
+Status Catalog::GetCatalogUsage(TGetCatalogUsageResponse* response) {
+  return JniUtil::CallJniMethod(catalog_, get_catalog_usage_id_, response);
+}
+
 Status Catalog::GetFunctions(const TGetFunctionsRequest& request,
     TGetFunctionsResponse *response) {
   return JniUtil::CallJniMethod(catalog_, get_functions_id_, request, response);

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 3119d60..13e4529 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -84,6 +84,15 @@ class Catalog {
   Status GetTableNames(const std::string& db, const std::string* pattern,
       TGetTablesResult* table_names);
 
+  /// Returns the collected metrics of a table. The response contains a
+  /// pretty-printed string representation of table metrics.
+  Status GetTableMetrics(const std::string& db, const std::string& tbl,
+      std::string* metrics);
+
+  /// Returns the current catalog usage that includes the most frequently accessed
+  /// tables as well as the tables with the highest memory requirements.
+  Status GetCatalogUsage(TGetCatalogUsageResponse* response);
+
   /// Gets all functions in the catalog matching the parameters in the given
   /// TFunctionsRequest.
   Status GetFunctions(const TGetFunctionsRequest& request,
@@ -109,8 +118,10 @@ class Catalog {
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
+  jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()
+  jmethodID get_table_metrics_id_; // JniCatalog.getTableMetrics()
   jmethodID get_functions_id_; // JniCatalog.getFunctions()
   jmethodID prioritize_load_id_; // JniCatalog.prioritizeLoad()
   jmethodID sentry_admin_check_id_; // JniCatalog.checkUserSentryAdmin()

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 7894f75..0f1ea5d 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -266,6 +266,17 @@ struct THdfsPartition {
 
   // (key,value) pairs stored in the Hive Metastore.
   15: optional map<string, string> hms_parameters
+
+  // The following fields store stats about this partition
+  // which are collected when toThrift() is called.
+  // Total number of blocks in this partition.
+  16: optional i64 num_blocks
+
+  // Total file size in bytes of this partition.
+  17: optional i64 total_file_size_bytes
+
+  // True, if this partition has incremental stats
+  18: optional bool has_incremental_stats
 }
 
 struct THdfsTable {

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f856871..ba21605 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -87,6 +87,17 @@ struct TGetTablesResult {
   1: list<string> tables
 }
 
+// Arguments to getTableMetrics, which returns the metrics of a specific table.
+struct TGetTableMetricsParams {
+  1: required CatalogObjects.TTableName table_name
+}
+
+// Response to a getTableMetrics request. The response contains all the collected metrics
+// pretty-printed into a string.
+struct TGetTableMetricsResponse {
+  1: required string metrics
+}
+
 // Arguments to getDbs, which returns a list of dbs that match an optional pattern
 struct TGetDbsParams {
   // If not set, match every database

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 939e276..4edf4d2 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -605,3 +605,25 @@ struct TDropFunctionParams {
   // the signature generated by the FE should just be plumbed through).
   4: optional string signature
 }
+
+// Stores metrics of a catalog table.
+struct TTableUsageMetrics {
+  1: required CatalogObjects.TTableName table_name
+
+  // Estimated memory usage of that table.
+  2: optional i64 memory_estimate_bytes
+
+  // Number of metadata operations performed on the table since it was loaded.
+  3: optional i64 num_metadata_operations
+}
+
+// Response to a GetCatalogUsage request.
+struct TGetCatalogUsageResponse{
+  // List of the largest (in terms of memory requirements) tables.
+  1: required list<TTableUsageMetrics> large_tables
+
+  // List of the most frequently accessed (in terms of number of metadata operations)
+  // tables.
+  2: required list<TTableUsageMetrics> frequently_accessed_tables
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 646820f..135ec77 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -312,6 +312,12 @@ under the License.
       <version>1.6.0.1</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>3.2.2</version>
+    </dependency>
+
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 9ed8133..4548c2b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
+
 import org.apache.log4j.Logger;
 
 /**
@@ -167,7 +168,11 @@ public abstract class Catalog {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(tableName.getDb_name());
     if (db == null) return null;
-    return db.removeTable(tableName.getTable_name());
+    Table tbl = db.removeTable(tableName.getTable_name());
+    if (tbl != null && !tbl.isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.removeTable(tbl);
+    }
+    return tbl;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index f75b0a8..8f75a16 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -63,10 +63,12 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogUsageResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTableUsageMetrics;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
@@ -74,6 +76,7 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -1434,11 +1437,12 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
     String tblName = tbl.getName();
-
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error refreshing metadata for table " +
           "%s due to lock contention", tbl.getFullName()));
     }
+    final Timer.Context context =
+        tbl.getMetrics().getTimer(Table.REFRESH_DURATION_METRIC).time();
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
       versionLock_.writeLock().unlock();
@@ -1456,6 +1460,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
       return tbl.toTCatalogObject();
     } finally {
+      context.stop();
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
@@ -1904,4 +1909,59 @@ public class CatalogServiceCatalog extends Catalog {
     }
     return versionToWaitFor;
   }
+
+  /**
+   * Retrieves information about the current catalog usage including the most frequently
+   * accessed tables as well as the tables with the highest memory requirements.
+   */
+  public TGetCatalogUsageResponse getCatalogUsage() {
+    TGetCatalogUsageResponse usage = new TGetCatalogUsageResponse();
+    usage.setLarge_tables(Lists.<TTableUsageMetrics>newArrayList());
+    usage.setFrequently_accessed_tables(Lists.<TTableUsageMetrics>newArrayList());
+    for (Table largeTable: CatalogUsageMonitor.INSTANCE.getLargestTables()) {
+      TTableUsageMetrics tableUsageMetrics =
+          new TTableUsageMetrics(largeTable.getTableName().toThrift());
+      tableUsageMetrics.setMemory_estimate_bytes(largeTable.getEstimatedMetadataSize());
+      usage.addToLarge_tables(tableUsageMetrics);
+    }
+    for (Table frequentTable:
+        CatalogUsageMonitor.INSTANCE.getFrequentlyAccessedTables()) {
+      TTableUsageMetrics tableUsageMetrics =
+          new TTableUsageMetrics(frequentTable.getTableName().toThrift());
+      tableUsageMetrics.setNum_metadata_operations(frequentTable.getMetadataOpsCount());
+      usage.addToFrequently_accessed_tables(tableUsageMetrics);
+    }
+    return usage;
+  }
+
+  /**
+   * Retrieves the stored metrics of the specified table and returns a pretty-printed
+   * string representation. Throws an exception if table metrics were not available
+   * because the table was not loaded or because another concurrent operation was holding
+   * the table lock.
+   */
+  public String getTableMetrics(TTableName tTableName) throws CatalogException {
+    String dbName = tTableName.db_name;
+    String tblName = tTableName.table_name;
+    Table tbl = getTable(dbName, tblName);
+    if (tbl == null) {
+      throw new CatalogException("Table " + dbName + "." + tblName + " was not found.");
+    }
+    String result;
+    if (tbl instanceof IncompleteTable) {
+      result = "No metrics available for table " + dbName + "." + tblName +
+          ". Table not yet loaded.";
+      return result;
+    }
+    if (!tbl.getLock().tryLock()) {
+      result = "Metrics for table " + dbName + "." + tblName + "are not available " +
+          "because the table is currently modified by another operation.";
+      return result;
+    }
+    try {
+      return tbl.getMetrics().toString();
+    } finally {
+      tbl.getLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java b/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
new file mode 100644
index 0000000..a2e8d7e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+
+import org.apache.impala.util.TopNCache;
+
+import com.google.common.base.Function;
+
+/**
+ * Singleton class that monitors catalog usage. Currently, it tracks the most
+ * frequently accessed tables (in terms of number of metadata operations) as well as
+ * the tables with the highest (estimated) memory requirements. This class is
+ * thread-safe.
+ */
+public final class CatalogUsageMonitor {
+
+  public final static CatalogUsageMonitor INSTANCE = new CatalogUsageMonitor();
+
+  private final TopNCache<Table, Long> frequentlyAccessedTables_;
+
+  private final TopNCache<Table, Long> largestTables_;
+
+  private CatalogUsageMonitor() {
+    final int num_tables_tracked = Integer.getInteger(
+        "org.apache.impala.catalog.CatalogUsageMonitor.NUM_TABLES_TRACKED", 25);
+    frequentlyAccessedTables_ = new TopNCache<Table, Long>(
+        new Function<Table, Long>() {
+          @Override
+          public Long apply(Table tbl) { return tbl.getMetadataOpsCount(); }
+        }, num_tables_tracked, true);
+
+    largestTables_ = new TopNCache<Table, Long>(
+        new Function<Table, Long>() {
+          @Override
+          public Long apply(Table tbl) { return tbl.getEstimatedMetadataSize(); }
+        }, num_tables_tracked, false);
+  }
+
+  public void updateFrequentlyAccessedTables(Table tbl) {
+    frequentlyAccessedTables_.putOrUpdate(tbl);
+  }
+
+  public void updateLargestTables(Table tbl) { largestTables_.putOrUpdate(tbl); }
+
+  public void removeTable(Table tbl) {
+    frequentlyAccessedTables_.remove(tbl);
+    largestTables_.remove(tbl);
+  }
+
+  public List<Table> getFrequentlyAccessedTables() {
+    return frequentlyAccessedTables_.listEntries();
+  }
+
+  public List<Table> getLargestTables() { return largestTables_.listEntries(); }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index 6df7c28..cf36a89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -64,6 +64,8 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.StatsHelper;
 import org.apache.impala.util.TResultRowBuilder;
+
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -321,6 +323,8 @@ public class HBaseTable extends Table {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
       msTable_ = msTbl;
       hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
@@ -414,6 +418,8 @@ public class HBaseTable extends Table {
     } catch (Exception e) {
       throw new TableLoadingException("Failed to load metadata for HBase table: " +
           name_, e);
+    } finally {
+      context.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e78ce92..2179346 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -931,6 +931,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     thriftHdfsPart.setAccess_level(accessLevel_);
     thriftHdfsPart.setIs_marked_cached(isMarkedCached_);
     thriftHdfsPart.setId(getId());
+    thriftHdfsPart.setHas_incremental_stats(hasIncrementalStats());
     // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
     // may try to serialize the returned THdfsPartition after releasing the table's lock,
     // and another thread doing DDL may modify the map.
@@ -938,11 +939,16 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
         includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters()));
     if (includeFileDesc) {
       // Add block location information
+      long numBlocks = 0;
+      long totalFileBytes = 0;
       for (FileDescriptor fd: fileDescriptors_) {
         thriftHdfsPart.addToFile_desc(fd.toThrift());
+        numBlocks += fd.getNumFileBlocks();
+        totalFileBytes += fd.getFileLength();
       }
+      thriftHdfsPart.setNum_blocks(numBlocks);
+      thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
     }
-
     return thriftHdfsPart;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 4de25fe..04599f5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -95,6 +95,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+
 /**
  * Internal representation of table-related metadata of a file-resident table on a
  * Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
@@ -124,6 +127,24 @@ public class HdfsTable extends Table {
   // Table property key for skip.header.line.count
   public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
 
+  // Average memory requirements (in bytes) for storing the metadata of a partition.
+  private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048;
+
+  // Average memory requirements (in bytes) for storing a file descriptor.
+  private static final long PER_FD_MEM_USAGE_BYTES = 500;
+
+  // Average memory requirements (in bytes) for storing a block.
+  private static final long PER_BLOCK_MEM_USAGE_BYTES = 150;
+
+  // Hdfs table specific metrics
+  public static final String CATALOG_UPDATE_DURATION_METRIC = "catalog-update-duration";
+  public static final String NUM_PARTITIONS_METRIC = "num-partitions";
+  public static final String NUM_FILES_METRIC = "num-files";
+  public static final String NUM_BLOCKS_METRIC = "num-blocks";
+  public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes";
+  public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes";
+  public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats";
+
   // string to indicate NULL. set in load() from table properties
   private String nullColumnValue_;
 
@@ -172,19 +193,14 @@ public class HdfsTable extends Table {
   // replicas of the block.
   private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
 
-  private HdfsPartitionLocationCompressor partitionLocationCompressor_;
-
-  // Total number of Hdfs files in this table. Accounted only in the Impalad catalog
-  // cache. Set to -1 on Catalogd.
-  private long numHdfsFiles_;
-
-  // Sum of sizes of all Hdfs files in this table. Accounted only in the Impalad
-  // catalog cache. Set to -1 on Catalogd.
-  private long totalHdfsBytes_;
+  // True iff this table has incremental stats in any of its partitions.
+  private boolean hasIncrementalStats_ = false;
 
   // True iff the table's partitions are located on more than one filesystem.
   private boolean multipleFileSystems_ = false;
 
+  private HdfsPartitionLocationCompressor partitionLocationCompressor_;
+
   // Base Hdfs directory where files of this table are stored.
   // For unpartitioned tables it is simply the path where all files live.
   // For partitioned tables it is the root directory
@@ -200,6 +216,50 @@ public class HdfsTable extends Table {
   // for setAvroSchema().
   private boolean isSchemaLoaded_ = false;
 
+  // Represents a set of storage-related statistics aggregated at the table or partition
+  // level.
+  public final static class FileMetadataStats {
+    // Nuber of files in a table/partition.
+    public long numFiles;
+    // Number of blocks in a table/partition.
+    public long numBlocks;
+    // Total size (in bytes) of all files in a table/partition.
+    public long totalFileBytes;
+
+    // Unsets the storage stats to indicate that their values are unknown.
+    public void unset() {
+      numFiles = -1;
+      numBlocks = -1;
+      totalFileBytes = -1;
+    }
+
+    // Initializes the values of the storage stats.
+    public void init() {
+      numFiles = 0;
+      numBlocks = 0;
+      totalFileBytes = 0;
+    }
+
+    public void set(FileMetadataStats stats) {
+      numFiles = stats.numFiles;
+      numBlocks = stats.numBlocks;
+      totalFileBytes = stats.totalFileBytes;
+    }
+  }
+
+  // Table level storage-related statistics. Depending on whether the table is stored in
+  // the catalog server or the impalad catalog cache, these statistics serve different
+  // purposes and, hence, are managed differently.
+  // Table stored in impalad catalog cache:
+  //   - Used in planning.
+  //   - Stats are modified real-time by the operations that modify table metadata
+  //   (e.g. add partition).
+  // Table stored in the the catalog server:
+  //   - Used for reporting through catalog web UI.
+  //   - Stats are reset whenever the table is loaded (due to a metadata operation) and
+  //   are set when the table is serialized to Thrift.
+  private FileMetadataStats fileMetadataStats_ = new FileMetadataStats();
+
   private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
 
   // Caching this configuration object makes calls to getFileSystem much quicker
@@ -311,17 +371,17 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Updates numHdfsFiles_ and totalHdfsBytes_ based on the partition information.
+   * Updates the storage stats of this table based on the partition information.
    * This is used only for the frontend tests that do not spawn a separate Catalog
    * instance.
    */
   public void computeHdfsStatsForTesting() {
-    Preconditions.checkState(numHdfsFiles_ == -1 && totalHdfsBytes_ == -1);
-    numHdfsFiles_ = 0;
-    totalHdfsBytes_ = 0;
+    Preconditions.checkState(fileMetadataStats_.numFiles == -1
+        && fileMetadataStats_.totalFileBytes == -1);
+    fileMetadataStats_.init();
     for (HdfsPartition partition: partitionMap_.values()) {
-      numHdfsFiles_ += partition.getNumFileDescriptors();
-      totalHdfsBytes_ += partition.getSize();
+      fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
+      fileMetadataStats_.totalFileBytes += partition.getSize();
     }
   }
 
@@ -681,8 +741,7 @@ public class HdfsTable extends Table {
         nullPartitionIds_.add(Sets.<Long>newHashSet());
       }
     }
-    numHdfsFiles_ = 0;
-    totalHdfsBytes_ = 0;
+    fileMetadataStats_.init();
   }
 
   /**
@@ -1023,8 +1082,8 @@ public class HdfsTable extends Table {
     }
     if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
     partitionMap_.put(partition.getId(), partition);
-    totalHdfsBytes_ += partition.getSize();
-    numHdfsFiles_ += partition.getNumFileDescriptors();
+    fileMetadataStats_.totalFileBytes += partition.getSize();
+    fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
     updatePartitionMdAndColStats(partition);
   }
 
@@ -1078,8 +1137,8 @@ public class HdfsTable extends Table {
    */
   private HdfsPartition dropPartition(HdfsPartition partition) {
     if (partition == null) return null;
-    totalHdfsBytes_ -= partition.getSize();
-    numHdfsFiles_ -= partition.getNumFileDescriptors();
+    fileMetadataStats_.totalFileBytes -= partition.getSize();
+    fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
     Preconditions.checkArgument(partition.getPartitionValues().size() ==
         numClusteringCols_);
     Long partitionId = partition.getId();
@@ -1176,49 +1235,54 @@ public class HdfsTable extends Table {
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       boolean loadParitionFileMetadata, boolean loadTableSchema,
       Set<String> partitionsToUpdate) throws TableLoadingException {
-    // turn all exceptions into TableLoadingException
-    msTable_ = msTbl;
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
-      if (loadTableSchema) loadSchema(client, msTbl);
-      if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
-        // This is the special case of CTAS that creates a 'temp' table that does not
-        // actually exist in the Hive Metastore.
-        initializePartitionMetadata(msTbl);
-        setTableStats(msTbl);
-        return;
-      }
-      // Load partition and file metadata
-      if (reuseMetadata) {
-        // Incrementally update this table's partitions and file metadata
-        LOG.info("Incrementally loading table metadata for: " + getFullName());
-        Preconditions.checkState(
-            partitionsToUpdate == null || loadParitionFileMetadata);
-        updateMdFromHmsTable(msTbl);
-        if (msTbl.getPartitionKeysSize() == 0) {
-          if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
+      // turn all exceptions into TableLoadingException
+      msTable_ = msTbl;
+      try {
+        if (loadTableSchema) loadSchema(client, msTbl);
+        if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
+          // This is the special case of CTAS that creates a 'temp' table that does not
+          // actually exist in the Hive Metastore.
+          initializePartitionMetadata(msTbl);
+          setTableStats(msTbl);
+          return;
+        }
+        // Load partition and file metadata
+        if (reuseMetadata) {
+          // Incrementally update this table's partitions and file metadata
+          LOG.info("Incrementally loading table metadata for: " + getFullName());
+          Preconditions.checkState(
+              partitionsToUpdate == null || loadParitionFileMetadata);
+          updateMdFromHmsTable(msTbl);
+          if (msTbl.getPartitionKeysSize() == 0) {
+            if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
+          } else {
+            updatePartitionsFromHms(
+                client, partitionsToUpdate, loadParitionFileMetadata);
+          }
+          LOG.info("Incrementally loaded table metadata for: " + getFullName());
         } else {
-          updatePartitionsFromHms(
-              client, partitionsToUpdate, loadParitionFileMetadata);
+          // Load all partitions from Hive Metastore, including file metadata.
+          LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
+          List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
+              MetaStoreUtil.fetchAllPartitions(
+                  client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+          LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
+          loadAllPartitions(msPartitions, msTbl);
         }
-        LOG.info("Incrementally loaded table metadata for: " + getFullName());
-      } else {
-        // Load all partitions from Hive Metastore, including file metadata.
-        LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
-        List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
-            MetaStoreUtil.fetchAllPartitions(
-                client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
-        LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
-        loadAllPartitions(msPartitions, msTbl);
+        if (loadTableSchema) setAvroSchema(client, msTbl);
+        setTableStats(msTbl);
+        fileMetadataStats_.unset();
+      } catch (TableLoadingException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new TableLoadingException("Failed to load metadata for table: "
+            + getFullName(), e);
       }
-      if (loadTableSchema) setAvroSchema(client, msTbl);
-      setTableStats(msTbl);
-      numHdfsFiles_ = -1;
-      totalHdfsBytes_ = -1;
-    } catch (TableLoadingException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for table: "
-          + getFullName(), e);
+    } finally {
+      context.stop();
     }
   }
 
@@ -1648,25 +1712,49 @@ public class HdfsTable extends Table {
    * partitions). To prevent the catalog from hitting an OOM error while trying to
    * serialize large partition incremental stats, we estimate the stats size and filter
    * the incremental stats data from partition objects if the estimate exceeds
-   * --inc_stats_size_limit_bytes
+   * --inc_stats_size_limit_bytes. This function also collects storage related statistics
+   *  (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
+   *  size of this table.
    */
   private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
     // includeFileDesc implies all partitions should be included (refPartitions == null).
     Preconditions.checkState(!includeFileDesc || refPartitions == null);
+    long memUsageEstimate = 0;
     int numPartitions =
         (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
+    memUsageEstimate += numPartitions * PER_PARTITION_MEM_USAGE_BYTES;
     long statsSizeEstimate =
         numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
     boolean includeIncrementalStats =
         (statsSizeEstimate < BackendConfig.INSTANCE.getIncStatsMaxSize());
+    FileMetadataStats stats = new FileMetadataStats();
     Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
     for (HdfsPartition partition: partitionMap_.values()) {
       long id = partition.getId();
       if (refPartitions == null || refPartitions.contains(id)) {
-        idToPartition.put(id,
-            partition.toThrift(includeFileDesc, includeIncrementalStats));
+        THdfsPartition tHdfsPartition =
+            partition.toThrift(includeFileDesc, includeIncrementalStats);
+        if (tHdfsPartition.isSetHas_incremental_stats() &&
+            tHdfsPartition.isHas_incremental_stats()) {
+          memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
+          hasIncrementalStats_ = true;
+        }
+        if (includeFileDesc) {
+          Preconditions.checkState(tHdfsPartition.isSetNum_blocks() &&
+              tHdfsPartition.isSetTotal_file_size_bytes());
+          stats.numBlocks += tHdfsPartition.getNum_blocks();
+          stats.numFiles +=
+              tHdfsPartition.isSetFile_desc() ? tHdfsPartition.getFile_desc().size() : 0;
+          stats.totalFileBytes += tHdfsPartition.getTotal_file_size_bytes();
+        }
+        idToPartition.put(id, tHdfsPartition);
       }
     }
+    if (includeFileDesc) fileMetadataStats_.set(stats);
+
+    memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
+        fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
+    setEstimatedMetadataSize(memUsageEstimate);
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_, idToPartition);
     hdfsTable.setAvroSchema(avroSchema_);
@@ -1680,7 +1768,7 @@ public class HdfsTable extends Table {
     return hdfsTable;
   }
 
-  public long getTotalHdfsBytes() { return totalHdfsBytes_; }
+  public long getTotalHdfsBytes() { return fileMetadataStats_.totalFileBytes; }
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
   public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
   public boolean isAvroTable() { return avroSchema_ != null; }
@@ -1978,8 +2066,11 @@ public class HdfsTable extends Table {
       // Compute and report the extrapolated row count because the set of files could
       // have changed since we last computed stats for this partition. We also follow
       // this policy during scan-cardinality estimation.
-      if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(totalHdfsBytes_));
-      rowBuilder.add(numHdfsFiles_).addBytes(totalHdfsBytes_)
+      if (statsExtrap) {
+        rowBuilder.add(getExtrapolatedNumRows(fileMetadataStats_.totalFileBytes));
+      }
+      rowBuilder.add(fileMetadataStats_.numFiles)
+          .addBytes(fileMetadataStats_.totalFileBytes)
           .addBytes(totalCachedBytes).add("").add("").add("").add("");
       result.addToRows(rowBuilder.get());
     }
@@ -2072,13 +2163,13 @@ public class HdfsTable extends Table {
     // Conservative max size for Java arrays. The actual maximum varies
     // from JVM version and sometimes between configurations.
     final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (numHdfsFiles_ > JVM_MAX_ARRAY_SIZE) {
+    if (fileMetadataStats_.numFiles > JVM_MAX_ARRAY_SIZE) {
       throw new IllegalStateException(String.format(
           "Too many files to generate a table sample. " +
           "Table '%s' has %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), numHdfsFiles_, JVM_MAX_ARRAY_SIZE));
+          getTableName().toString(), fileMetadataStats_.numFiles, JVM_MAX_ARRAY_SIZE));
     }
-    int totalNumFiles = (int) numHdfsFiles_;
+    int totalNumFiles = (int) fileMetadataStats_.numFiles;
 
     // Ensure a consistent ordering of files for repeatable runs. The files within a
     // partition are already ordered based on how they are loaded in the catalog.
@@ -2134,4 +2225,37 @@ public class HdfsTable extends Table {
     }
     return result;
   }
+
+  /**
+   * Registers table metrics.
+   */
+  @Override
+  public void initMetrics() {
+    super.initMetrics();
+    metrics_.addGauge(NUM_PARTITIONS_METRIC, new Gauge<Integer>() {
+      @Override
+      public Integer getValue() { return partitionMap_.values().size(); }
+    });
+    metrics_.addGauge(NUM_FILES_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.numFiles; }
+    });
+    metrics_.addGauge(NUM_BLOCKS_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.numBlocks; }
+    });
+    metrics_.addGauge(TOTAL_FILE_BYTES_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.totalFileBytes; }
+    });
+    metrics_.addGauge(MEMORY_ESTIMATE_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return getEstimatedMetadataSize(); }
+    });
+    metrics_.addGauge(HAS_INCREMENTAL_STATS_METRIC, new Gauge<Boolean>() {
+      @Override
+      public Boolean getValue() { return hasIncrementalStats_; }
+    });
+    metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 7e13ac5..e9e1617 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -58,6 +58,7 @@ import org.apache.kudu.client.PartitionSchema.RangeSchema;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -215,30 +216,36 @@ public class KuduTable extends Table {
   @Override
   public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
-    msTable_ = msTbl;
-    kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
-    Preconditions.checkNotNull(kuduTableName_);
-    kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
-    Preconditions.checkNotNull(kuduMasters_);
-    setTableStats(msTable_);
-    // Load metadata from Kudu and HMS
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
-      loadSchemaFromKudu();
-      loadAllColumnStats(msClient);
-    } catch (ImpalaRuntimeException e) {
-      throw new TableLoadingException("Error loading metadata for Kudu table " +
-          kuduTableName_, e);
-    }
+      msTable_ = msTbl;
+      kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
+      Preconditions.checkNotNull(kuduTableName_);
+      kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+      Preconditions.checkNotNull(kuduMasters_);
+      setTableStats(msTable_);
+      // Load metadata from Kudu and HMS
+      try {
+        loadSchemaFromKudu();
+        loadAllColumnStats(msClient);
+      } catch (ImpalaRuntimeException e) {
+        throw new TableLoadingException("Error loading metadata for Kudu table " +
+            kuduTableName_, e);
+      }
 
-    // Update the table schema in HMS.
-    try {
-      long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
-      msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
-      msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
-          StatsSetupConst.TRUE);
-      msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
-    } catch (TException e) {
-      throw new TableLoadingException(e.getMessage());
+      // Update the table schema in HMS.
+      try {
+        long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
+        msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+        msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
+            StatsSetupConst.TRUE);
+        msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
+      } catch (TException e) {
+        throw new TableLoadingException(e.getMessage());
+      }
+    } finally {
+      context.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 50fe953..a6536ba 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAccessLevel;
@@ -73,6 +75,18 @@ public abstract class Table extends CatalogObjectImpl {
   // values of -1 indicate an unknown statistic.
   protected TTableStats tableStats_;
 
+  // Estimated size (in bytes) of this table metadata. Stored in an AtomicLong to allow
+  // this field to be accessed without holding the table lock.
+  protected AtomicLong estimatedMetadataSize_ = new AtomicLong(0);
+
+  // Number of metadata operations performed on that table since it was loaded.
+  // Stored in an AtomicLong to allow this field to be accessed without holding the
+  // table lock.
+  protected AtomicLong metadataOpsCount_ = new AtomicLong(0);
+
+  // Metrics for this table
+  protected final Metrics metrics_ = new Metrics();
+
   // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
   // the clustering columns.
   protected final ArrayList<Column> colsByPos_ = Lists.newArrayList();
@@ -89,6 +103,12 @@ public abstract class Table extends CatalogObjectImpl {
   // True if this object is stored in an Impalad catalog cache.
   protected boolean storedInImpaladCatalogCache_ = false;
 
+  // Table metrics. These metrics are applicable to all table types. Each subclass of
+  // Table can define additional metrics specific to that table type.
+  public static final String REFRESH_DURATION_METRIC = "refresh-duration";
+  public static final String ALTER_DURATION_METRIC = "alter-duration";
+  public static final String LOAD_DURATION_METRIC = "load-duration";
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -99,12 +119,36 @@ public abstract class Table extends CatalogObjectImpl {
         CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
     tableStats_ = new TTableStats(-1);
     tableStats_.setTotal_file_bytes(-1);
+    initMetrics();
   }
 
   public ReentrantLock getLock() { return tableLock_; }
   public abstract TTableDescriptor toThriftDescriptor(
       int tableId, Set<Long> referencedPartitions);
   public abstract TCatalogObjectType getCatalogObjectType();
+  public long getMetadataOpsCount() { return metadataOpsCount_.get(); }
+  public long getEstimatedMetadataSize() { return estimatedMetadataSize_.get(); }
+  public void setEstimatedMetadataSize(long estimatedMetadataSize) {
+    estimatedMetadataSize_.set(estimatedMetadataSize);
+    if (!isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.updateLargestTables(this);
+    }
+  }
+
+  public void incrementMetadataOpsCount() {
+    metadataOpsCount_.incrementAndGet();
+    if (!isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.updateFrequentlyAccessedTables(this);
+    }
+  }
+
+  public void initMetrics() {
+    metrics_.addTimer(REFRESH_DURATION_METRIC);
+    metrics_.addTimer(ALTER_DURATION_METRIC);
+    metrics_.addTimer(LOAD_DURATION_METRIC);
+  }
+
+  public Metrics getMetrics() { return metrics_; }
 
   // Returns true if this table reference comes from the impalad catalog cache or if it
   // is loaded from the testing framework. Returns false if this table reference points
@@ -527,4 +571,19 @@ public abstract class Table extends CatalogObjectImpl {
     }
     return new Pair<String, Short>(cachePoolName, cacheReplication);
   }
+
+  /**
+   * The implementations of hashCode() and equals() functions are using table names as
+   * unique identifiers of tables. Hence, they should be used with caution and not in
+   * cases where truly unique table objects are needed.
+   */
+  @Override
+  public int hashCode() { return getFullName().hashCode(); }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (!(obj instanceof Table)) return false;
+    return getFullName().equals(((Table) obj).getFullName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/common/Metrics.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/Metrics.java b/fe/src/main/java/org/apache/impala/common/Metrics.java
new file mode 100644
index 0000000..cf8621f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/Metrics.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import java.util.Map.Entry;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+/**
+ * Thin wrapper class around MetricRegisty. Allows users to register and access metrics of
+ * various types (counter, meter, histogram, and timer). This class is not thread-safe.
+ * TODO: Expose the metrics in Json format via a toJson() function.
+ */
+public final class Metrics {
+
+  private final MetricRegistry registry_ = new MetricRegistry();
+
+  public Metrics() {}
+
+  public void addCounter(String name) { registry_.counter(name); }
+  public void addMeter(String name) { registry_.meter(name); }
+  public void addHistogram(String name) { registry_.histogram(name); }
+  public void addTimer(String name) { registry_.timer(name); }
+
+  @SuppressWarnings("rawtypes")
+  public <T extends Gauge> void addGauge(String name, T gauge) {
+    registry_.register(name, gauge);
+  }
+
+  /**
+   * Returns a counter named 'name'. If the counter does not exist, it is registered in
+   * the metrics registry.
+   */
+  public Counter getCounter(String name) {
+    Counter counter = registry_.getCounters().get(name);
+    if (counter == null) counter = registry_.counter(name);
+    return counter;
+  }
+
+  /**
+   * Returns a meter named 'name'. If the meter does not exist, it is registered in the
+   * metrics registry.
+   */
+  public Meter getMeter(String name) {
+    Meter meter = registry_.getMeters().get(name);
+    if (meter == null) meter = registry_.meter(name);
+    return meter;
+  }
+
+  /**
+   * Returns a histogram named 'name'. If the histogram does not exist, it is registered
+   * in the metrics registry.
+   */
+  public Histogram getHistogram(String name) {
+    Histogram histogram = registry_.getHistograms().get(name);
+    if (histogram == null) histogram = registry_.histogram(name);
+    return histogram;
+  }
+
+  /**
+   * Returns a timer named 'name'. If the timer does not exist, it is registered in the
+   * metrics registry.
+   */
+  public Timer getTimer(String name) {
+    Timer timer = registry_.getTimers().get(name);
+    if (timer == null) timer = registry_.timer(name);
+    return timer;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Gauge getGauge(String name) { return registry_.getGauges().get(name); }
+
+  /**
+   * Returns a string representation of all registered metrics.
+   */
+  @Override
+  @SuppressWarnings("rawtypes")
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    for (Entry<String, Counter> entry: registry_.getCounters().entrySet()) {
+      result.append(entry.getKey() + ": " + String.valueOf(entry.getValue().getCount()));
+      result.append("\n");
+    }
+    for (Entry<String, Timer> entry: registry_.getTimers().entrySet()) {
+      result.append(entry.getKey() + ": " + timerToString(entry.getValue()));
+      result.append("\n");
+    }
+    for (Entry<String, Gauge> entry: registry_.getGauges().entrySet()) {
+      result.append(entry.getKey() + ": " + String.valueOf(entry.getValue().getValue()));
+      result.append("\n");
+    }
+    for (Entry<String, Histogram> entry: registry_.getHistograms().entrySet()) {
+      result.append(entry.getKey() + ": " +
+          snapshotToString(entry.getValue().getSnapshot()));
+      result.append("\n");
+    }
+    return result.toString();
+  }
+
+  /**
+   * Helper function that pretty prints the contents of a timer metric.
+   */
+  private String timerToString(Timer timer) {
+    StringBuilder builder = new StringBuilder();
+    return builder.append("\n\tCount: " + timer.getCount() + "\n")
+        .append("\tMean rate: " + timer.getMeanRate() + "\n")
+        .append("\t1min rate: " + timer.getOneMinuteRate() + "\n")
+        .append("\t5min rate: " + timer.getFiveMinuteRate() + "\n")
+        .append("\t15min rate: " + timer.getFifteenMinuteRate() + "\n")
+        .append(snapshotToString(timer.getSnapshot()))
+        .toString();
+  }
+
+  /**
+   * Helper function that pretty prints the contents of a metric snapshot.
+   */
+  private String snapshotToString(Snapshot snapshot) {
+    StringBuilder builder = new StringBuilder();
+    return builder.append("\n\tMin (msec): " + snapshot.getMin() / 1000000 + "\n")
+        .append("\tMax (msec): " + snapshot.getMax() / 1000000 + "\n")
+        .append("\tMean (msec): " + snapshot.getMean() / 1000000 + "\n")
+        .append("\tMedian (msec): " + snapshot.getMedian() / 1000000 + "\n")
+        .append("\t75th-% (msec): " + snapshot.get75thPercentile() / 1000000 + "\n")
+        .append("\t95th-% (msec): " + snapshot.get95thPercentile() / 1000000 + "\n")
+        .append("\t99th-% (msec): " + snapshot.get99thPercentile() / 1000000 + "\n")
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index df3b10b..f1422db 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -56,6 +56,7 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.CatalogUsageMonitor;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
 import org.apache.impala.catalog.DataSource;
@@ -152,6 +153,7 @@ import org.apache.impala.util.MetaStoreUtil;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -368,6 +370,8 @@ public class CatalogOpExecutor {
       throw new InternalException(String.format("Error altering table %s due to lock " +
           "contention.", tbl.getFullName()));
     }
+    final Timer.Context context
+        = tbl.getMetrics().getTimer(Table.ALTER_DURATION_METRIC).time();
     try {
       if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
           || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
@@ -544,6 +548,7 @@ public class CatalogOpExecutor {
         response.setResult_set(resultSet);
       }
     } finally {
+      context.stop();
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
@@ -3165,6 +3170,8 @@ public class CatalogOpExecutor {
     if (!catalog_.tryLockTable(table)) {
       throw new InternalException("Error updating the catalog due to lock contention.");
     }
+    final Timer.Context context
+        = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -3319,6 +3326,7 @@ public class CatalogOpExecutor {
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
       addTableToCatalogUpdate(table, response.result);
     } finally {
+      context.stop();
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       table.getLock().unlock();
     }
@@ -3356,7 +3364,10 @@ public class CatalogOpExecutor {
    * This is to help protect against certain scenarios where the table was
    * modified or dropped between the time analysis completed and the the catalog op
    * started executing. However, even with these checks it is possible the table was
-   * modified or dropped/re-created without us knowing. TODO: Track object IDs to
+   * modified or dropped/re-created without us knowing. This function also updates the
+   * table usage counter.
+   *
+   * TODO: Track object IDs to
    * know when a table has been dropped and re-created with the same name.
    */
   private Table getExistingTable(String dbName, String tblName) throws CatalogException {
@@ -3364,6 +3375,7 @@ public class CatalogOpExecutor {
     if (tbl == null) {
       throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
     }
+    tbl.incrementMetadataOpsCount();
 
     if (!tbl.isLoaded()) {
       throw new CatalogException(String.format("Table '%s.%s' was modified while " +

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e945a3b..ed5a51a 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
 import org.apache.impala.thrift.TGetTablesParams;
+import org.apache.impala.thrift.TGetTableMetricsParams;
 import org.apache.impala.thrift.TGetTablesResult;
 import org.apache.impala.thrift.TLogLevel;
 import org.apache.impala.thrift.TPrioritizeLoadRequest;
@@ -197,6 +198,16 @@ public class JniCatalog {
   }
 
   /**
+   * Returns the collected metrics of a table.
+   */
+  public String getTableMetrics(byte[] getTableMetricsParams) throws ImpalaException,
+      TException {
+    TGetTableMetricsParams params = new TGetTableMetricsParams();
+    JniUtil.deserializeThrift(protocolFactory_, params, getTableMetricsParams);
+    return catalog_.getTableMetrics(params.table_name);
+  }
+
+  /**
    * Gets the thrift representation of a catalog object.
    */
   public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
@@ -262,4 +273,12 @@ public class JniCatalog {
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(catalogOpExecutor_.updateCatalog(request));
   }
+
+  /**
+   * Returns information about the current catalog usage.
+   */
+  public byte[] getCatalogUsage() throws ImpalaException, TException {
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(catalog_.getCatalogUsage());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/util/TopNCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/TopNCache.java b/fe/src/main/java/org/apache/impala/util/TopNCache.java
new file mode 100644
index 0000000..9f6b972
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/TopNCache.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Thread-safe class that represents a TOP-N cache of items. It stores the top-N items of
+ * a generic type T based on a user-specified ranking function that returns a numeric
+ * value (long).
+ *
+ * The cache has a maximum capacity (N) of stored items. The implementation allows two
+ * policies with respect to the way new items are handled when maximum capacity is
+ * reached:
+ * a) Always evict policy: A new item will always replace the item with the lowest rank
+ * according to the specified ranking function even if the rank of the newly added
+ * function is lower than the one to be replaced.
+ * b) Rank-based eviction policy: A new item will be added to the cache iff its rank is
+ * higher than the smallest rank in the cache and the item with that rank will be evicted.
+ *
+ * TODO: Replace these two policies with an LFU cache with dynamic aging.
+ */
+public final class TopNCache<T, R extends Long>  {
+
+  // Function used to rank items stored in this cache.
+  private final Function<T, R> function_;
+  // Maximum capacity of this cache.
+  private final int maxCapacity_;
+  // The cache is stored as a priority queue.
+  private final PriorityQueue<T> heap_;
+  // Determines the eviction policy to apply when the cache reaches maximum capacity.
+  // TODO: Convert to enum?
+  private final boolean alwaysEvictAtCapacity_;
+
+  /**
+   * Compares the ranks of two T objects, returning 0 if they are equal, < 0 if the rank
+   * of the first is smaller, or > 0 if the rank of the first object is larger.
+   */
+  private int compareRanks(T t1, T t2) {
+    return function_.apply(t1).compareTo(function_.apply(t2));
+  }
+
+  public TopNCache(Function<T, R> f, int maxCapacity, boolean evictAtCapacity) {
+    Preconditions.checkNotNull(f);
+    Preconditions.checkState(maxCapacity > 0);
+    function_ = f;
+    maxCapacity_ = maxCapacity;
+    heap_ = new PriorityQueue<T>(maxCapacity_,
+        new Comparator<T>() {
+          @Override
+          public int compare(T t1, T t2) { return compareRanks(t1, t2); }
+        }
+    );
+    alwaysEvictAtCapacity_ = evictAtCapacity;
+  }
+
+  /**
+   * Adds or updates an item in the cache. If the item already exists, its rank position
+   * is refreshed by removing and adding the item back to the cache. If the item is not in
+   * the cache and maximum capacity hasn't been reached, the item is added to the cache.
+   * Otherwise, the eviction policy is applied and the item either replaces the cache item
+   * with the lowest rank or it is rejected from the cache if its rank is lower than the
+   * lowest rank in the cache.
+   */
+  public synchronized void putOrUpdate(T item) {
+    if (!heap_.remove(item)) {
+      if (heap_.size() == maxCapacity_) {
+        if (!alwaysEvictAtCapacity_ && compareRanks(item, heap_.peek()) <= 0) {
+          return;
+        }
+        heap_.poll();
+      }
+    }
+    heap_.add(item);
+  }
+
+  /**
+   * Removes an item from the cache.
+   */
+  public synchronized void remove(T item) { heap_.remove(item); }
+
+  /**
+   * Returns the list of all the items in the cache.
+   */
+  public synchronized List<T> listEntries() { return ImmutableList.copyOf(heap_); }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestTopNCache.java b/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
new file mode 100644
index 0000000..1b34599
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Unit tests for the TopNCache class.
+ */
+public class TestTopNCache {
+
+  /**
+   * Create a TopNCache with 'capacity' max capacity and populate it with 'numEntries'
+   * entries where each entry is a number from 0 to 'numEntries'.
+   */
+  private static TopNCache<Long, Long> createAndPopulate(int capacity,
+      long numEntries, boolean policy) {
+    TopNCache<Long, Long> cache =
+        new TopNCache<Long, Long>(new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, policy);
+    for (long i = 0; i < numEntries; ++i) cache.putOrUpdate(i);
+    return cache;
+  }
+
+  @Test
+  public void testCreateAndPopulateCache() throws Exception {
+    int[] capacities = {1, 10, 1000};
+    boolean[] evictionPolicies = {true, false};
+    for (int capacity: capacities) {
+      for (boolean policy: evictionPolicies) {
+        TopNCache<Long, Long> cache =
+            createAndPopulate(capacity, 2 * capacity, policy);
+        assertEquals(cache.listEntries().size(), capacity);
+        for (long i = 0; i < capacity * 2; i++) cache.remove(i);
+        assertEquals(cache.listEntries().size(), 0);
+      }
+    }
+  }
+
+  @Test
+  public void testUpdateExistingElements() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = createAndPopulate(capacity, capacity / 2, true);
+    assertEquals(cache.listEntries().size(), capacity / 2);
+    // Adding the same elements should not alter the number of elements stored in the
+    // cache.
+    for (long i = 0; i < capacity / 2; i++) cache.putOrUpdate(i);
+    assertEquals(cache.listEntries().size(), capacity / 2);
+  }
+
+  @Test
+  public void testAlwaysEvictPolicy() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = createAndPopulate(capacity, capacity, true);
+    assertEquals(cache.listEntries().size(), capacity);
+    cache.putOrUpdate((long) capacity);
+    assertEquals(cache.listEntries().size(), capacity);
+    // Assert that the new element replaced the smallest element in the cache
+    assertTrue(!cache.listEntries().contains(Long.valueOf(0)));
+    cache.putOrUpdate((long) capacity + 1);
+    assertTrue(!cache.listEntries().contains(Long.valueOf(1)));
+    List<Long> cacheElements = cache.listEntries();
+    for (long i = 2; i < capacity + 2; i++) {
+      assertTrue(cacheElements.contains(Long.valueOf(i)));
+    }
+  }
+
+  @Test
+  public void testRankBasedEvictionPolicy() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = new TopNCache<Long, Long>(
+        new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, false);
+    for (long i = 1; i < capacity + 1; i++) cache.putOrUpdate(i);
+    assertEquals(cache.listEntries().size(), capacity);
+    cache.putOrUpdate((long) 0);
+    // 0 shouldn't be added to the cache because it's rank is smaller than the lowest rank
+    // in the cache.
+    assertTrue(!cache.listEntries().contains(Long.valueOf(0)));
+    cache.putOrUpdate((long) capacity + 1);
+    assertEquals(cache.listEntries().size(), capacity);
+    assertTrue(cache.listEntries().contains(Long.valueOf(capacity + 1)));
+  }
+
+  @Test
+  public void testRankBasedEvictionPolicyWithRandomInput() throws Exception {
+    final int capacity = 5;
+    long[] values = {10, 8, 1, 2, 5, 4, 3, 6, 9, 7};
+    TopNCache<Long, Long> cache = new TopNCache<Long, Long>(
+        new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, false);
+    for (Long entry: values) cache.putOrUpdate(entry);
+    List<Long> entries = cache.listEntries();
+    assertEquals(entries.size(), capacity);
+    // Make sure only the top-5 elements are in the cache
+    for (long i = 1; i <= capacity; ++i) {
+      assertTrue(!entries.contains(i));
+      assertTrue(entries.contains(i + capacity));
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 101f786..54163dc 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -29,6 +29,7 @@ class TestWebPage(ImpalaTestSuite):
   RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level"
   CATALOG_URL = "http://localhost:{0}/catalog"
   CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
+  TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
   QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
   THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
   # log4j changes do not apply to the statestore since it doesn't
@@ -37,6 +38,7 @@ class TestWebPage(ImpalaTestSuite):
   # one with it.
   TEST_PORTS_WITHOUT_SS = ["25000", "25020"]
   TEST_PORTS_WITH_SS = ["25000", "25010", "25020"]
+  CATALOG_TEST_PORT = ["25020"]
 
   def test_memz(self):
     """test /memz at impalad / statestored / catalogd"""
@@ -151,6 +153,8 @@ class TestWebPage(ImpalaTestSuite):
     self.__test_catalog_object("functional_parquet", "alltypes")
     self.__test_catalog_object("functional", "alltypesnopart")
     self.__test_catalog_object("functional_kudu", "alltypes")
+    self.__test_table_metrics("functional", "alltypes", "total-file-size-bytes")
+    self.__test_table_metrics("functional_kudu", "alltypes", "alter-duration")
 
   def __test_catalog_object(self, db_name, tbl_name):
     """Tests the /catalog_object endpoint for the given db/table. Runs
@@ -164,6 +168,11 @@ class TestWebPage(ImpalaTestSuite):
       "?object_type=TABLE&object_name=%s.%s" % (db_name, tbl_name), tbl_name,
       ports_to_test=self.TEST_PORTS_WITHOUT_SS)
 
+  def __test_table_metrics(self, db_name, tbl_name, metric):
+    self.client.execute("refresh %s.%s" % (db_name, tbl_name))
+    self.get_and_check_status(self.TABLE_METRICS_URL +
+      "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
+
   def test_query_details(self, unique_database):
     """Test that /query_backends returns the list of backend states for DML or queries;
     nothing for DDL statements"""

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/www/catalog.tmpl
----------------------------------------------------------------------
diff --git a/www/catalog.tmpl b/www/catalog.tmpl
index 5e271bf..ffe7c78 100644
--- a/www/catalog.tmpl
+++ b/www/catalog.tmpl
@@ -20,6 +20,87 @@ under the License.
 
 <h2>Catalog</h2>
 
+{{?has_large_tables}}
+<div class="panel panel-info">
+  <div class="panel-heading">
+      <h2 class="panel-title">
+      Top-{{num_large_tables}} Tables with Highest Memory Requirements
+      </h2>
+  </div>
+  <div class="panel-body">
+    <table id="large-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Estimated memory</th>
+          <th>Metrics</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#large_tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{name}}">{{name}}</a>
+          </td>
+          <td>{{mem_estimate}}</td>
+          <td><a href="table_metrics?name={{name}}">{{name}}-metrics</a></td>
+        </tr>
+        {{/large_tables}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<script>
+    $(document).ready(function() {
+        $('#large-tables').DataTable({
+            "order": [[ 0, "desc" ]],
+            "pageLength": 10
+        });
+    });
+</script>
+{{/has_large_tables}}
+
+{{?has_frequent_tables}}
+<div class="panel panel-info">
+  <div class="panel-heading">
+      <h2 class="panel-title">
+      Top-{{num_frequent_tables}} Tables with Highest Number of Metadata Operations
+      </h2>
+  </div>
+  <div class="panel-body">
+    <table id="frequent-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Metadata Operations (since loaded)</th>
+          <th>Metrics</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#frequent_tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{name}}">{{name}}</a>
+          </td>
+          <td>{{num_metadata_ops}}</td>
+          <td><a href="table_metrics?name={{name}}">{{name}}-metrics</a></td>
+        </tr>
+        {{/frequent_tables}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<script>
+    $(document).ready(function() {
+        $('#frequent-tables').DataTable({
+            "order": [[ 0, "desc" ]],
+            "pageLength": 10
+        });
+    });
+</script>
+{{/has_frequent_tables}}
+
+<h3>Databases</h3>
 <ol class="breadcrumb">
 {{#databases}}
 <li><a href='#{{name}}'>{{name}}</a></li>
@@ -36,16 +117,38 @@ under the License.
     </a>
   </div>
   <div class="panel-body">
-    <ul>
-      {{#tables}}
-      <li>
-        <a href="catalog_object?object_type=TABLE&object_name={{fqtn}}">{{name}}</a>
-      </li>
-      {{/tables}}
-    </ul>
+    <table id="{{name}}-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          {{?has_metrics}}
+          <th>Metrics</th>
+          {{/has_metrics}}
+        </tr>
+      </thead>
+      <tbody>
+        {{#tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{fqtn}}">{{name}}</a>
+          </td>
+          {{?has_metrics}}
+          <td><a href="table_metrics?name={{fqtn}}">{{name}}-metrics</a></td>
+          {{/has_metrics}}
+        </tr>
+        {{/tables}}
+      </tbody>
+    </table>
   </div>
 </div>
 
+<script>
+    $(document).ready(function() {
+        $('#{{name}}-tables').DataTable({
+            "pageLength": 5
+        });
+    });
+</script>
+
 {{/databases}}
 
 {{> www/common-footer.tmpl }}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/www/table_metrics.tmpl
----------------------------------------------------------------------
diff --git a/www/table_metrics.tmpl b/www/table_metrics.tmpl
new file mode 100644
index 0000000..5140309
--- /dev/null
+++ b/www/table_metrics.tmpl
@@ -0,0 +1,23 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+{{> www/common-header.tmpl }}
+
+<pre>{{table_metrics}}</pre>
+
+{{> www/common-footer.tmpl }}


[3/5] impala git commit: IMPALA-6368: make test_chars parallel

Posted by jr...@apache.org.
IMPALA-6368: make test_chars parallel

Previously it had to be executed serially because it modified tables in
the functional database.

This change separates out tests that use temporary tables and runs those
in a unique_database.

Testing:
Ran locally in a loop with parallelism of 4 for a while.

Change-Id: I2f62ede90f619b8cebbb1276bab903e7555d9744
Reviewed-on: http://gerrit.cloudera.org:8080/9022
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/579e3320
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/579e3320
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/579e3320

Branch: refs/heads/master
Commit: 579e33207b3bbe8b6a26cf1fe1e7fd8d26021475
Parents: 3f00d10
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jan 12 17:10:51 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 09:55:52 2018 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/chars-tmp-tables.test     | 275 +++++++++++++++++++
 .../queries/QueryTest/chars.test                | 262 +-----------------
 tests/query_test/test_chars.py                  |  50 +---
 3 files changed, 286 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/579e3320/testdata/workloads/functional-query/queries/QueryTest/chars-tmp-tables.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/chars-tmp-tables.test b/testdata/workloads/functional-query/queries/QueryTest/chars-tmp-tables.test
new file mode 100644
index 0000000..f6dc4c4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/chars-tmp-tables.test
@@ -0,0 +1,275 @@
+====
+---- QUERY
+create table test_char_tmp (c char(5))
+---- RESULTS
+====
+---- QUERY
+insert into test_char_tmp select cast("hello" as char(5))
+---- RESULTS
+: 1
+====
+---- QUERY
+select * from test_char_tmp
+---- TYPES
+char
+---- RESULTS
+'hello'
+====
+---- QUERY
+# Regression test for IMPALA-1248
+insert into test_char_tmp
+values (cast("hel" as char(5))),
+       (cast(cast("hello000" as VARCHAR(8)) as char(5)))
+====
+---- QUERY
+select * from test_char_tmp where c = cast('hel' as char(5))
+---- TYPES
+char
+---- RESULTS
+'hel  '
+====
+---- QUERY
+insert into test_char_tmp values (NULL)
+====
+---- QUERY
+select * from test_char_tmp as A CROSS JOIN test_char_tmp as B
+where B.c = cast('hel' as CHAR(5))
+ORDER BY A.c
+---- TYPES
+char, char
+---- RESULTS
+'hel  ','hel  '
+'hello','hel  '
+'hello','hel  '
+'NULL','hel  '
+====
+---- QUERY
+select * from test_char_tmp as A, test_char_tmp as B
+where A.c = B.c AND A.c != 'hello'
+---- TYPES
+char, char
+---- RESULTS
+'hel  ','hel  '
+====
+---- QUERY
+select lower(c) from test_char_tmp ORDER BY c
+---- TYPES
+string
+---- RESULTS
+'hel  '
+'hello'
+'hello'
+'NULL'
+====
+---- QUERY
+create table test_varchar_tmp (vc varchar(5))
+---- RESULTS
+====
+---- QUERY
+insert into test_varchar_tmp values (cast("hello" as varchar(5)))
+====
+---- QUERY
+select * from test_varchar_tmp
+---- TYPES
+string
+---- RESULTS
+'hello'
+====
+---- QUERY
+insert into test_varchar_tmp values (cast("xyzzzzz12" as varchar(7)))
+---- CATCH
+would need to be cast to VARCHAR(5)
+====
+---- QUERY
+select cast("xyzzzzz12" as varchar(-1))
+---- CATCH
+Syntax error
+====
+====
+---- QUERY
+insert into test_varchar_tmp values (cast("hel" as varchar(4)))
+====
+---- QUERY
+select * from test_varchar_tmp
+---- TYPES
+string
+---- RESULTS
+'hello'
+'hel'
+====
+---- QUERY
+create table allchars
+(cshort char(5), clong char(140), vc varchar(5))
+---- RESULTS
+====
+---- QUERY
+insert into allchars values (cast("123456" as char(5)), cast("123456" as char(140)),
+cast("123456" as varchar(5)))
+====
+---- QUERY
+select cshort, clong, vc from allchars
+---- TYPES
+char,char,string
+---- RESULTS
+'12345','123456                                                                                                                                      ','12345'
+====
+---- QUERY
+create table allchars_par
+(cshort char(5), clong char(140), vc varchar(5)) stored as parquet
+---- RESULTS
+====
+---- QUERY
+insert into allchars_par values (cast("123456" as char(5)), cast("123456" as char(140)),
+cast("123456" as varchar(5)))
+====
+---- QUERY
+select cshort, clong, vc from allchars_par
+---- TYPES
+char,char,string
+---- RESULTS
+'12345','123456                                                                                                                                      ','12345'
+====
+---- QUERY
+create table char_parts (vc varchar(32)) partitioned by
+(csp char(5), clp char(140), vcp varchar(32))
+====
+---- QUERY
+insert into char_parts (csp, clp, vcp, vc) select cs, cl, vc, vc from functional.chars_tiny
+====
+---- QUERY
+select csp, clp, vcp from char_parts where csp != cast('dne' as char(5)) order by csp
+---- TYPES
+char, char, string
+---- RESULTS
+'1aaaa','1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','1cccc'
+'2aaaa','2bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','2cccccc'
+'3aaa ','3bbbbb                                                                                                                                      ','3ccc'
+'4aa  ','4bbbb                                                                                                                                       ','4cc'
+'5a   ','5bbb                                                                                                                                        ','5c'
+'6a   ','6b                                                                                                                                          ','6c'
+'6a   ','6b                                                                                                                                          ','6c'
+'a    ','b                                                                                                                                           ','c'
+====
+---- QUERY
+insert into char_parts partition (csp=cast('foo' as char(5)),
+clp=cast('01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789xxxxxxx' as char(140)),
+vcp=cast('myvar' as varchar(32))) select cast('val' as varchar(32));
+====
+---- QUERY
+select csp, clp, vcp from char_parts where csp = cast('foo' as char(5))
+---- TYPES
+char, char, string
+---- RESULTS
+'foo  ','01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789','myvar'
+====
+---- QUERY
+# Regression test for IMPALA-1322
+create table t_1822 (c10 char(10), c100 char(100), v100 varchar(100), v200 varchar(200), s string);
+====
+---- QUERY
+# Regression test for IMPALA-1322
+insert into t_1822 values (cast('a' as char(1)), cast('a' as char(1)),
+cast('a' as varchar(1)), cast('a' as varchar(1)), 'a');
+====
+---- QUERY
+# Regression test for IMPALA-1316
+select count(*) from t_1822 as t join t_1822 as tt
+on cast(tt.s as char(129)) = t.c10 and
+cast(tt.s as char(129)) = t.c100 and tt.c10 = t.c100;
+---- TYPES
+bigint
+---- RESULTS
+1
+====
+---- QUERY
+create table
+test_char_nulls ( c20 char(20),
+                  c40 char(40),
+                  c60 char(60),
+                  c80 char(80),
+                  c81 char(81),
+                  c82 char(82),
+                  c100 char(100),
+                  c120 char(120),
+                  c140 char(140))
+---- RESULTS
+====
+---- QUERY
+insert into test_char_nulls
+values (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL),
+       (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
+---- RESULTS
+: 2
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c20 from test_char_nulls group by c20;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c40 from test_char_nulls group by c40;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c60 from test_char_nulls group by c60;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c80 from test_char_nulls group by c80;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c81 from test_char_nulls group by c81;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c82 from test_char_nulls group by c82;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c100 from test_char_nulls group by c100;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c120 from test_char_nulls group by c120;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====
+---- QUERY
+# Regression test for IMPALA-1339
+select c140 from test_char_nulls group by c140;
+---- TYPES
+char
+---- RESULTS
+'NULL'
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/579e3320/testdata/workloads/functional-query/queries/QueryTest/chars.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/chars.test b/testdata/workloads/functional-query/queries/QueryTest/chars.test
index cd1519e..cd915ce 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/chars.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/chars.test
@@ -1,122 +1,10 @@
 ====
 ---- QUERY
-# TODO: string literals should start as CHAR(N) and analysis
-# should promote as necessary
-insert into test_char_tmp select cast("hello" as char(5))
-====
----- QUERY
-select * from test_char_tmp
----- TYPES
-char
----- RESULTS
-'hello'
-====
----- QUERY
-# Regression test for IMPALA-1248
-insert into test_char_tmp values (cast("hel" as char(5)))
-====
----- QUERY
-insert into test_char_tmp select cast(cast("hello000" as VARCHAR(8)) as char(5))
-====
----- QUERY
-select * from test_char_tmp where c = cast('hel' as char(5))
----- TYPES
-char
----- RESULTS
-'hel  '
-====
----- QUERY
-insert into test_char_tmp values (NULL)
-====
----- QUERY
-select * from test_char_tmp as A CROSS JOIN test_char_tmp as B
-where B.c = cast('hel' as CHAR(5))
-ORDER BY A.c
----- TYPES
-char, char
----- RESULTS
-'hel  ','hel  '
-'hello','hel  '
-'hello','hel  '
-'NULL','hel  '
-====
----- QUERY
-select * from test_char_tmp as A, test_char_tmp as B
-where A.c = B.c AND A.c != 'hello'
----- TYPES
-char, char
----- RESULTS
-'hel  ','hel  '
-====
----- QUERY
-select lower(c) from test_char_tmp ORDER BY c
----- TYPES
-string
----- RESULTS
-'hel  '
-'hello'
-'hello'
-'NULL'
-====
----- QUERY
-insert into test_varchar_tmp values (cast("hello" as varchar(5)))
-====
----- QUERY
-select * from test_varchar_tmp
----- TYPES
-string
----- RESULTS
-'hello'
-====
----- QUERY
-insert into test_varchar_tmp values (cast("xyzzzzz12" as varchar(7)))
----- CATCH
-would need to be cast to VARCHAR(5)
-====
----- QUERY
-select cast("xyzzzzz12" as varchar(-1))
----- CATCH
-Syntax error
-====
----- QUERY
 select (cast("xyzzzzz12" as char(-1)))
 ---- CATCH
 Syntax error
 ====
 ---- QUERY
-insert into test_varchar_tmp values (cast("hel" as varchar(4)))
-====
----- QUERY
-select * from test_varchar_tmp
----- TYPES
-string
----- RESULTS
-'hello'
-'hel'
-====
----- QUERY
-insert into allchars values (cast("123456" as char(5)), cast("123456" as char(140)),
-cast("123456" as varchar(5)))
-====
----- QUERY
-select cshort, clong, vc from allchars
----- TYPES
-char,char,string
----- RESULTS
-'12345','123456                                                                                                                                      ','12345'
-====
----- QUERY
-insert into allchars_par values (cast("123456" as char(5)), cast("123456" as char(140)),
-cast("123456" as varchar(5)))
-====
----- QUERY
-select cshort, clong, vc from allchars_par
----- TYPES
-char,char,string
----- RESULTS
-'12345','123456                                                                                                                                      ','12345'
-====
----- QUERY
 select count(*), count(cs), count(cl), count(vc) from chars_tiny
 ---- TYPES
 bigint,bigint,bigint,bigint
@@ -160,14 +48,14 @@ bigint
 1
 ====
 ---- QUERY
-select cs, count(cl) from functional.chars_tiny group by cs having count(vc) > 1
+select cs, count(cl) from chars_tiny group by cs having count(vc) > 1
 ---- TYPES
 char, bigint
 ---- RESULTS
 '6a   ',2
 ====
 ---- QUERY
-select A.cs from functional.chars_tiny as A, functional.chars_tiny as B where
+select A.cs from chars_tiny as A, chars_tiny as B where
 cast(A.cs as char(1)) = cast(B.cl as char(1)) order by A.cs
 ---- TYPES
 char
@@ -183,47 +71,8 @@ char
 '6a   '
 ====
 ---- QUERY
-drop table if exists char_parts
-====
----- QUERY
-create table if not exists char_parts (vc varchar(32)) partitioned by
-(csp char(5), clp char(140), vcp varchar(32))
-====
----- QUERY
-insert into char_parts (csp, clp, vcp, vc) select cs, cl, vc, vc from chars_tiny
-====
----- QUERY
-select csp, clp, vcp from char_parts where csp != cast('dne' as char(5)) order by csp
----- TYPES
-char, char, string
----- RESULTS
-'1aaaa','1bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','1cccc'
-'2aaaa','2bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb','2cccccc'
-'3aaa ','3bbbbb                                                                                                                                      ','3ccc'
-'4aa  ','4bbbb                                                                                                                                       ','4cc'
-'5a   ','5bbb                                                                                                                                        ','5c'
-'6a   ','6b                                                                                                                                          ','6c'
-'6a   ','6b                                                                                                                                          ','6c'
-'a    ','b                                                                                                                                           ','c'
-====
----- QUERY
-insert into char_parts partition (csp=cast('foo' as char(5)),
-clp=cast('01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789xxxxxxx' as char(140)),
-vcp=cast('myvar' as varchar(32))) select cast('val' as varchar(32));
-====
----- QUERY
-select csp, clp, vcp from char_parts where csp = cast('foo' as char(5))
----- TYPES
-char, char, string
----- RESULTS
-'foo  ','01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789','myvar'
-====
----- QUERY
-drop table if exists char_parts
-====
----- QUERY
 # Regression test for IMPALA-1316
-select A.vc from functional.chars_tiny as A join functional.chars_tiny using (vc) order by A.vc
+select A.vc from chars_tiny as A join chars_tiny using (vc) order by A.vc
 ---- TYPES
 string
 ---- RESULTS
@@ -240,7 +89,7 @@ string
 ====
 ---- QUERY
 # Regression test for IMPALA-1322
-select count(*) from functional.chars_tiny as A, functional.chars_tiny as B
+select count(*) from chars_tiny as A, chars_tiny as B
 where cast(A.cs as CHAR(1)) = cast(B.vc as CHAR(1));
 ---- TYPES
 bigint
@@ -249,40 +98,13 @@ bigint
 ====
 ---- QUERY
 select min(cs), max(vc), ndv(cl), ndv(vc), appx_median(cs), appx_median(vc)
-from functional.chars_tiny
+from chars_tiny
 ---- TYPES
 string, string, bigint, bigint, string, string
 ---- RESULTS
 '1aaaa','c',7,7,'5a   ','5c'
 ====
 ---- QUERY
-# Regression test for IMPALA-1322
-drop table if exists functional.t_1822;
-====
----- QUERY
-# Regression test for IMPALA-1322
-create table functional.t_1822 (c10 char(10), c100 char(100), v100 varchar(100), v200 varchar(200), s string);
-====
----- QUERY
-# Regression test for IMPALA-1322
-insert into functional.t_1822 values (cast('a' as char(1)), cast('a' as char(1)),
-cast('a' as varchar(1)), cast('a' as varchar(1)), 'a');
-====
----- QUERY
-# Regression test for IMPALA-1316
-select count(*) from functional.t_1822 as t join functional.t_1822 as tt
-on cast(tt.s as char(129)) = t.c10 and
-cast(tt.s as char(129)) = t.c100 and tt.c10 = t.c100;
----- TYPES
-bigint
----- RESULTS
-1
-====
----- QUERY
-# Regression test for IMPALA-1322
-drop table if exists functional.t_1822;
-====
----- QUERY
 # Regression test for IMPALA-1316
 select t1.vc, COUNT(1) FROM chars_tiny t1 GROUP BY 1 ORDER BY t1.vc
 ---- TYPES
@@ -313,81 +135,9 @@ char, bigint
 'NULL',1
 ====
 ---- QUERY
-# Regression test for IMPALA-1339
-select c20 from test_char_nulls group by c20;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c40 from test_char_nulls group by c40;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c60 from test_char_nulls group by c60;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c80 from test_char_nulls group by c80;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c81 from test_char_nulls group by c81;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c82 from test_char_nulls group by c82;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c100 from test_char_nulls group by c100;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c120 from test_char_nulls group by c120;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
-# Regression test for IMPALA-1339
-select c140 from test_char_nulls group by c140;
----- TYPES
-char
----- RESULTS
-'NULL'
-====
----- QUERY
 # Regression test for IMPALA-1344
 select cs, LAST_VALUE(cs) OVER (ORDER BY cs rows between unbounded preceding and
-current row) FROM functional.chars_tiny;
+current row) FROM chars_tiny;
 ---- TYPES
 char, string
 ---- RESULTS

http://git-wip-us.apache.org/repos/asf/impala/blob/579e3320/tests/query_test/test_chars.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_chars.py b/tests/query_test/test_chars.py
index ec06da8..eaa744a 100644
--- a/tests/query_test/test_chars.py
+++ b/tests/query_test/test_chars.py
@@ -25,49 +25,6 @@ class TestStringQueries(ImpalaTestSuite):
   def get_workload(cls):
     return 'functional-query'
 
-  def setup_method(self, method):
-    self.__cleanup_char_tables()
-    self.__create_char_tables()
-
-  def teardown_method(self, method):
-    self.__cleanup_char_tables()
-
-  def __cleanup_char_tables(self):
-    self.client.execute('drop table if exists functional.test_char_tmp');
-    self.client.execute('drop table if exists functional.test_varchar_tmp');
-    self.client.execute('drop table if exists functional.allchars');
-    self.client.execute('drop table if exists functional.allchars_par');
-    self.client.execute('drop table if exists functional.test_char_nulls');
-
-  def __create_char_tables(self):
-    self.client.execute(
-        'create table if not exists ' +
-        'functional.test_varchar_tmp (vc varchar(5))')
-    self.client.execute(
-        'create table if not exists functional.test_char_tmp (c char(5))')
-    self.client.execute(
-        'create table if not exists functional.allchars ' +
-        '(cshort char(5), clong char(140), vc varchar(5))')
-    self.client.execute(
-        'create table if not exists functional.allchars_par ' +
-        '(cshort char(5), clong char(140), vc varchar(5)) stored as parquet')
-
-    # Regression test for IMPALA-1339
-    self.client.execute('create table if not exists ' +
-        '''functional.test_char_nulls ( c20 char(20),
-                                        c40 char(40),
-                                        c60 char(60),
-                                        c80 char(80),
-                                        c81 char(81),
-                                        c82 char(82),
-                                        c100 char(100),
-                                        c120 char(120),
-                                        c140 char(140))''')
-    self.client.execute('insert into functional.test_char_nulls ' +
-        'values (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)');
-    self.client.execute('insert into functional.test_char_nulls ' +
-        'values (NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)');
-
   @classmethod
   def add_test_dimensions(cls):
     super(TestStringQueries, cls).add_test_dimensions()
@@ -77,10 +34,13 @@ class TestStringQueries(ImpalaTestSuite):
         v.get_value('table_format').file_format in ['text'] and
         v.get_value('table_format').compression_codec in ['none'])
 
-  @pytest.mark.execute_serially
-  def test_varchar(self, vector):
+  def test_chars(self, vector):
     self.run_test_case('QueryTest/chars', vector)
 
+  def test_chars_tmp_tables(self, vector, unique_database):
+    # Tests that create temporary tables and require a unique database.
+    self.run_test_case('QueryTest/chars-tmp-tables', vector, unique_database)
+
 class TestCharFormats(ImpalaTestSuite):
   @classmethod
   def get_workload(cls):