You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/03/07 00:00:40 UTC

[2/2] incubator-impala git commit: IMPALA-4624: Implement Parquet dictionary filtering

IMPALA-4624: Implement Parquet dictionary filtering

Here is a basic summary of the changes:
Frontend looks for conjuncts that operate on a single slot and pass a
map from slot id to the conjunct index through thrift to the backend.
The conjunct indices are the indices into the normal PlanNode conjuncts list.
The conjuncts need to satisfy certain conditions:
1. They are bound on a single slot
2. They are deterministic (no random functions)
3. They evaluate to FALSE on a NULL input. This is because the dictionary
does not include NULLs, so any condition that evaluates to TRUE on NULL
cannot be evaluated by looking only at the dictionary.

The backend converts the indices into ExprContexts. These are cloned in
the scanner threads.

The dictionary read codepath has been removed from ReadDataPage into its
own function, InitDictionary. This has also been turned into its own step
in row group initialization. ReadDataPage will not see any dictionary
pages unless the parquet file is invalid.

For dictionary filtering, we initialize dictionaries only as needed to evaluate
the conjuncts. The Parquet scanner evaluates the dictionary filter conjuncts on the
dictionary to see if any dictionary entry passes. If no entry passes, the row
group is eliminated. If the row group passes the dictionary filtering, then we
initialize all remaining dictionaries.

Dictionary filtering is controlled by a new query option,
parquet_dictionary_filtering, which is on by default.

Since column chunks can have a mixture of encodings, dictionary filtering
uses three tests to determine whether this is purely dictionary encoded:
1. If the encoding_stats is in the parquet file, then use it to determine if
there are only dictionary encoded pages (i.e. there are no data pages with
an encoding other than PLAIN_DICTIONARY).
-OR-
2. If the encoding stats are not present, then look at the encodings. The column
is purely dictionary encoded if:
a) PLAIN_DICTIONARY is present
AND
b) Only PLAIN_DICTIONARY, RLE, or BIT_PACKED encodings are listed
-OR-
3. If this file was written by an older version of Impala, then we know that
dictionary failover happens when the dictionary reaches 40,000 values.
Dictionary filtering can proceed as long as the dictionary is smaller than
that.

parquet-mr writes the encoding list correctly in the current version in our
environment (1.5.0). This means that check #2 works on some existing files
(potentially most existing parquet-mr files).
parquet-mr writes the encoding stats starting in 1.9.0. This is the version
where check #1 will start working.

Impala's parquet writer now implements both, so either check above will work.

Change-Id: I3a7cc3bd0523fbf3c79bd924219e909ef671cfd7
Reviewed-on: http://gerrit.cloudera.org:8080/5904
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9b923a1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9b923a1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9b923a1a

Branch: refs/heads/master
Commit: 9b923a1a277d90d7056e9c64f97904a90d8f231b
Parents: c667363
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Thu Jan 26 18:07:46 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Mar 6 23:20:34 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             | 266 ++++++++++++-
 be/src/exec/hdfs-parquet-scanner.h              |  33 ++
 be/src/exec/hdfs-parquet-table-writer.cc        |  84 +++--
 be/src/exec/hdfs-scan-node-base.cc              |  10 +
 be/src/exec/hdfs-scan-node-base.h               |   9 +
 be/src/exec/hdfs-scanner.cc                     |   7 +
 be/src/exec/hdfs-scanner.h                      |   3 +
 be/src/exec/parquet-column-readers.cc           | 375 +++++++++++--------
 be/src/exec/parquet-column-readers.h            |  29 +-
 be/src/service/query-options.cc                 |   5 +
 be/src/service/query-options.h                  |   3 +-
 be/src/util/dict-encoding.h                     |  18 +-
 be/src/util/dict-test.cc                        |  36 +-
 common/thrift/ImpalaInternalService.thrift      |   3 +
 common/thrift/ImpalaService.thrift              |   3 +
 common/thrift/PlanNodes.thrift                  |   4 +
 common/thrift/parquet.thrift                    | 103 +++++
 .../java/org/apache/impala/analysis/Expr.java   |  10 +
 .../impala/analysis/FunctionCallExpr.java       |  23 +-
 .../org/apache/impala/planner/HdfsScanNode.java |  65 +++-
 .../org/apache/impala/planner/PlannerTest.java  |   7 +
 .../queries/PlannerTest/constant-folding.test   |   3 +
 .../queries/PlannerTest/mt-dop-validation.test  |   6 +
 .../queries/PlannerTest/parquet-filtering.test  |  51 +++
 .../PlannerTest/predicate-propagation.test      |   4 +-
 .../QueryTest/mt-dop-parquet-filtering.test     | 250 +++++++++++++
 .../queries/QueryTest/parquet-filtering.test    | 248 ++++++++++++
 tests/query_test/test_mt_dop.py                 |  16 +
 tests/query_test/test_scanners.py               |   4 +
 29 files changed, 1482 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ab40713..4955949 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -66,6 +66,12 @@ static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
 // upper bound.
 const int MAX_DICT_HEADER_SIZE = 100;
 
+// Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
+// has fewer entries, then the entire column is dictionary encoded. This threshold
+// is guaranteed to be true for Impala versions 2.9 or below.
+// THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
+const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
+
 const int64_t HdfsParquetScanner::FOOTER_SIZE;
 const int16_t HdfsParquetScanner::ROW_GROUP_END;
 const int16_t HdfsParquetScanner::INVALID_LEVEL;
@@ -73,6 +79,9 @@ const int16_t HdfsParquetScanner::INVALID_POS;
 
 const char* HdfsParquetScanner::LLVM_CLASS_NAME = "class.impala::HdfsParquetScanner";
 
+const string PARQUET_MEM_LIMIT_EXCEEDED =
+    "HdfsParquetScanner::$0() failed to allocate $1 bytes for $2.";
+
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const std::vector<HdfsFileDesc*>& files) {
   vector<DiskIoMgr::ScanRange*> footer_ranges;
@@ -155,11 +164,13 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
           scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
       metadata_range_(NULL),
       dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+      dict_filter_tuple_backing_(scan_node->mem_tracker()),
       assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
       process_footer_timer_stats_(NULL),
       num_cols_counter_(NULL),
       num_row_groups_counter_(NULL),
       num_scanners_with_no_reads_counter_(NULL),
+      num_dict_filtered_row_groups_counter_(NULL),
       codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
@@ -177,6 +188,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
   num_scanners_with_no_reads_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
+  num_dict_filtered_row_groups_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
   process_footer_timer_stats_ =
       ADD_SUMMARY_STATS_TIMER(
           scan_node_->runtime_profile(), "FooterProcessingTime");
@@ -244,6 +257,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // Set top-level template tuple.
   template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
 
+  RETURN_IF_ERROR(InitDictFilterStructures());
+
   // The scanner-wide stream was used only to read the file footer.  Each column has added
   // its own stream.
   stream_ = NULL;
@@ -615,8 +630,39 @@ Status HdfsParquetScanner::NextRowGroup() {
       continue;
     }
 
-    // Prepare column readers for first read
-    RETURN_IF_ERROR(InitColumns(row_group_idx_, column_readers_));
+    // Prepare dictionary filtering columns for first read
+    // This must be done before dictionary filtering, because this code initializes
+    // the column offsets and streams needed to read the dictionaries.
+    // TODO: Restructure the code so that the dictionary can be read without the rest
+    // of the column.
+    RETURN_IF_ERROR(InitColumns(row_group_idx_, dict_filterable_readers_));
+
+    // If there is a dictionary-encoded column where every value is eliminated
+    // by a conjunct, the row group can be eliminated. This initializes dictionaries
+    // for all columns visited.
+    bool skip_row_group_on_dict_filters;
+    Status status = EvalDictionaryFilters(row_group, &skip_row_group_on_dict_filters);
+    if (!status.ok()) {
+      // Either return an error or skip this row group if it is ok to ignore errors
+      RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      continue;
+    }
+    if (skip_row_group_on_dict_filters) {
+      COUNTER_ADD(num_dict_filtered_row_groups_counter_, 1);
+      continue;
+    }
+
+    // At this point, the row group has passed any filtering criteria
+    // Prepare non-dictionary filtering column readers for first read and
+    // initialize their dictionaries.
+    RETURN_IF_ERROR(InitColumns(row_group_idx_, non_dict_filterable_readers_));
+    status = InitDictionaries(non_dict_filterable_readers_);
+    if (!status.ok()) {
+      // Either return an error or skip this row group if it is ok to ignore errors
+      RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
+      continue;
+    }
+
     bool seeding_ok = true;
     for (ParquetColumnReader* col_reader: column_readers_) {
       // Seed collection and boolean column readers with NextLevel().
@@ -657,6 +703,204 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   }
 }
 
+bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
+  // Nested types are not supported yet
+  if (col_reader->IsCollectionReader()) return false;
+
+  BaseScalarColumnReader* scalar_reader =
+    static_cast<BaseScalarColumnReader*>(col_reader);
+  const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+  // Some queries do not need the column to be materialized, so slot_desc is NULL.
+  // For example, a count(*) with no predicates only needs to count records
+  // rather than materializing the values.
+  if (!slot_desc) return false;
+  // Does this column reader have any dictionary filter conjuncts?
+  auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id());
+  if (dict_filter_it == scanner_dict_filter_map_.end()) return false;
+
+  // Certain datatypes (chars, timestamps) do not have the appropriate value in the
+  // file format and must be converted before return. This is true for the
+  // dictionary values, so skip these datatypes for now.
+  // TODO: The values should be converted during dictionary construction and stored
+  // in converted form in the dictionary.
+  if (scalar_reader->NeedsConversion()) return false;
+
+  // Certain datatypes (timestamps) need to validate the value, as certain bit
+  // combinations are not valid. The dictionary values are not validated, so
+  // skip these datatypes for now.
+  // TODO: This should be pushed into dictionary construction.
+  if (scalar_reader->NeedsValidation()) return false;
+
+  return true;
+}
+
+Status HdfsParquetScanner::InitDictFilterStructures() {
+  // Check dictionary filtering query option
+  bool dictionary_filtering_enabled =
+      state_->query_options().parquet_dictionary_filtering;
+
+  // Allocate space for dictionary filtering tuple
+  // Certain queries do not need any columns to be materialized (such as count(*))
+  // and have a tuple size of 0. Explicitly disable dictionary filtering in this case.
+  int tuple_size = scan_node_->tuple_desc()->byte_size();
+  if (tuple_size > 0) {
+    if (!dict_filter_tuple_backing_.TryAllocate(tuple_size)) {
+      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED,
+          "InitDictFilterStructures", tuple_size, "Dictionary Filtering Tuple");
+      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size);
+    }
+  } else {
+    dictionary_filtering_enabled = false;
+  }
+
+  // Divide the column readers into a list of column readers that are eligible for
+  // dictionary filtering and a list of column readers that are not. If dictionary
+  // filtering is disabled, all column readers go into the ineligible list.
+  for (ParquetColumnReader* col_reader : column_readers_) {
+    if (dictionary_filtering_enabled && IsDictFilterable(col_reader)) {
+      dict_filterable_readers_.push_back(col_reader);
+    } else {
+      non_dict_filterable_readers_.push_back(col_reader);
+    }
+  }
+  return Status::OK();
+}
+
+bool HdfsParquetScanner::IsDictionaryEncoded(
+    const parquet::ColumnMetaData& col_metadata) {
+  // The Parquet spec allows for column chunks to have mixed encodings
+  // where some data pages are dictionary-encoded and others are plain
+  // encoded. For example, a Parquet file writer might start writing
+  // a column chunk as dictionary encoded, but it will switch to plain
+  // encoding if the dictionary grows too large.
+  //
+  // In order for dictionary filters to skip the entire row group,
+  // the conjuncts must be evaluated on column chunks that are entirely
+  // encoded with the dictionary encoding. There are two checks
+  // available to verify this:
+  // 1. The encoding_stats field on the column chunk metadata provides
+  //    information about the number of data pages written in each
+  //    format. This allows for a specific check of whether all the
+  //    data pages are dictionary encoded.
+  // 2. The encodings field on the column chunk metadata lists the
+  //    encodings used. If this list contains the dictionary encoding
+  //    and does not include unexpected encodings (i.e. encodings not
+  //    associated with definition/repetition levels), then it is entirely
+  //    dictionary encoded.
+
+  if (col_metadata.__isset.encoding_stats) {
+    // Condition #1 above
+    for (const parquet::PageEncodingStats& enc_stat : col_metadata.encoding_stats) {
+      if (enc_stat.page_type == parquet::PageType::DATA_PAGE &&
+          enc_stat.encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+          enc_stat.count > 0) {
+        return false;
+      }
+    }
+  } else {
+    // Condition #2 above
+    bool has_dict_encoding = false;
+    bool has_nondict_encoding = false;
+    for (const parquet::Encoding::type& encoding : col_metadata.encodings) {
+      if (encoding == parquet::Encoding::PLAIN_DICTIONARY) has_dict_encoding = true;
+
+      // RLE and BIT_PACKED are used for repetition/definition levels
+      if (encoding != parquet::Encoding::PLAIN_DICTIONARY &&
+          encoding != parquet::Encoding::RLE &&
+          encoding != parquet::Encoding::BIT_PACKED) {
+        has_nondict_encoding = true;
+        break;
+      }
+    }
+    // Not entirely dictionary encoded if:
+    // 1. No dictionary encoding listed
+    // OR
+    // 2. Some non-dictionary encoding is listed
+    if (!has_dict_encoding || has_nondict_encoding) return false;
+  }
+
+  return true;
+}
+
+Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_group,
+    bool* row_group_eliminated) {
+  *row_group_eliminated = false;
+
+  // TODO: Bootstrapping problem: existing 2.9 files don't have the encoding
+  // stats or encodings set properly, but after this goes in, they will.
+  // Change to 2.9 later.
+  bool is_legacy_impala = false;
+  if (file_version_.application == "impala" && file_version_.VersionLt(2,10,0)) {
+    is_legacy_impala = true;
+  }
+
+  Tuple* dict_filter_tuple =
+      reinterpret_cast<Tuple*>(dict_filter_tuple_backing_.buffer());
+  dict_filter_tuple->Init(scan_node_->tuple_desc()->byte_size());
+  vector<ParquetColumnReader*> deferred_dict_init_list;
+  for (ParquetColumnReader* col_reader : dict_filterable_readers_) {
+    DCHECK(!col_reader->IsCollectionReader());
+    BaseScalarColumnReader* scalar_reader =
+        static_cast<BaseScalarColumnReader*>(col_reader);
+    const parquet::ColumnMetaData& col_metadata =
+        row_group.columns[scalar_reader->col_idx()].meta_data;
+
+    // Legacy impala files cannot be eliminated here, because the only way to
+    // determine whether the column is 100% dictionary encoded requires reading
+    // the dictionary.
+    if (!is_legacy_impala && !IsDictionaryEncoded(col_metadata)) {
+      // We cannot guarantee that this reader is 100% dictionary encoded,
+      // so dictionary filters cannot be used. Defer initializing its dictionary
+      // until after the other filters have been evaluated.
+      deferred_dict_init_list.push_back(scalar_reader);
+      continue;
+    }
+
+    RETURN_IF_ERROR(scalar_reader->InitDictionary());
+    DictDecoderBase* dictionary = scalar_reader->GetDictionaryDecoder();
+    if (!dictionary) continue;
+
+    // Legacy (version < 2.9) Impala files do not spill to PLAIN encoding until
+    // it reaches the maximum number of dictionary entries. If the dictionary
+    // has fewer entries, then it is 100% dictionary encoded.
+    if (is_legacy_impala &&
+        dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
+
+    const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
+    auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id());
+    DCHECK(dict_filter_it != scanner_dict_filter_map_.end());
+    vector<ExprContext*>& dict_filter_conjunct_ctxs = dict_filter_it->second;
+    void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
+    bool column_has_match = false;
+    for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
+      dictionary->GetValue(dict_idx, slot);
+
+      // We can only eliminate this row group if no value from the dictionary matches.
+      // If any dictionary value passes the conjuncts, then move on to the next column.
+      TupleRow row;
+      row.SetTuple(0, dict_filter_tuple);
+      if (ExecNode::EvalConjuncts(dict_filter_conjunct_ctxs.data(),
+          dict_filter_conjunct_ctxs.size(), &row)) {
+        column_has_match = true;
+        break;
+      }
+    }
+
+    if (!column_has_match) {
+      // The column contains no value that matches the conjunct. The row group
+      // can be eliminated.
+      *row_group_eliminated = true;
+      return Status::OK();
+    }
+  }
+
+  // Any columns that were not 100% dictionary encoded need to initialize
+  // their dictionaries here.
+  RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
+
+  return Status::OK();
+}
+
 /// High-level steps of this function:
 /// 1. Allocate 'scratch' memory for tuples able to hold a full batch
 /// 2. Populate the slots of all scratch tuples one column reader at a time,
@@ -1425,6 +1669,24 @@ Status HdfsParquetScanner::InitColumns(
   return Status::OK();
 }
 
+Status HdfsParquetScanner::InitDictionaries(
+    const vector<ParquetColumnReader*>& column_readers) {
+  for (ParquetColumnReader* col_reader : column_readers) {
+    if (col_reader->IsCollectionReader()) {
+      CollectionColumnReader* collection_reader =
+          static_cast<CollectionColumnReader*>(col_reader);
+      // Recursively init child reader dictionaries
+      RETURN_IF_ERROR(InitDictionaries(*collection_reader->children()));
+      continue;
+    }
+
+    BaseScalarColumnReader* scalar_reader =
+        static_cast<BaseScalarColumnReader*>(col_reader);
+    RETURN_IF_ERROR(scalar_reader->InitDictionary());
+  }
+  return Status::OK();
+}
+
 Status HdfsParquetScanner::ValidateEndOfRowGroup(
     const vector<ParquetColumnReader*>& column_readers, int row_group_idx, int64_t rows_read) {
   DCHECK(!column_readers.empty());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 87f488f..34bfcc9 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -444,6 +444,17 @@ class HdfsParquetScanner : public HdfsScanner {
   /// pages in a column chunk.
   boost::scoped_ptr<MemPool> dictionary_pool_;
 
+  /// Column readers that are eligible for dictionary filtering
+  /// These are pointers to elements of column_readers_
+  std::vector<ParquetColumnReader*> dict_filterable_readers_;
+
+  /// Column readers that are not eligible for dictionary filtering
+  /// These are pointers to elements of column_readers_
+  std::vector<ParquetColumnReader*> non_dict_filterable_readers_;
+
+  /// Memory used to store the tuple used for dictionary filtering
+  ScopedBuffer dict_filter_tuple_backing_;
+
   /// Timer for materializing rows.  This ignores time getting the next buffer.
   ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
 
@@ -463,6 +474,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// with the midpoint of any row-group in the file.
   RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
 
+  /// Number of row groups skipped due to dictionary filter
+  RuntimeProfile::Counter* num_dict_filtered_row_groups_counter_;
+
   typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
   /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise.
   ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
@@ -600,6 +614,9 @@ class HdfsParquetScanner : public HdfsScanner {
   Status InitColumns(
       int row_group_idx, const std::vector<ParquetColumnReader*>& column_readers);
 
+  /// Initialize dictionaries for all column readers
+  Status InitDictionaries(const std::vector<ParquetColumnReader*>& column_readers);
+
   /// Performs some validation once we've reached the end of a row group to help detect
   /// bugs or bad input files.
   Status ValidateEndOfRowGroup(const std::vector<ParquetColumnReader*>& column_readers,
@@ -612,6 +629,22 @@ class HdfsParquetScanner : public HdfsScanner {
   /// from mem pools to the given row batch. Closes all column readers.
   /// Should be called after completing a row group and when returning the last batch.
   void FlushRowGroupResources(RowBatch* row_batch);
+
+  /// Evaluates whether the column reader is eligible for dictionary predicates
+  bool IsDictFilterable(ParquetColumnReader* col_reader);
+
+  /// Divides the column readers into dict_filterable_readers_ and
+  /// non_dict_filterable_readers_. Allocates memory for dict_filter_tuple_backing_.
+  Status InitDictFilterStructures();
+
+  /// Returns true if all of the data pages in the column chunk are dictionary encoded
+  bool IsDictionaryEncoded(const parquet::ColumnMetaData& col_metadata);
+
+  /// Checks to see if this row group can be eliminated based on applying conjuncts
+  /// to the dictionary values. Specifically, if any dictionary-encoded column has
+  /// no values that pass the relevant conjuncts, then the row group can be skipped.
+  Status EvalDictionaryFilters(const parquet::RowGroup& row_group,
+      bool* skip_row_group);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 890aa5d..164e921 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -150,6 +150,12 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     num_values_ = 0;
     total_compressed_byte_size_ = 0;
     current_encoding_ = Encoding::PLAIN;
+    column_encodings_.clear();
+    dict_encoding_stats_.clear();
+    data_encoding_stats_.clear();
+    // Repetition/definition level encodings are constant. Incorporate them here.
+    column_encodings_.insert(Encoding::RLE);
+    column_encodings_.insert(Encoding::BIT_PACKED);
   }
 
   // Close this writer. This is only called after Flush() and no more rows will
@@ -238,6 +244,16 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   int64_t total_uncompressed_byte_size_;
   Encoding::type current_encoding_;
 
+  // Set of all encodings used in the column chunk
+  unordered_set<Encoding::type> column_encodings_;
+
+  // Map from the encoding to the number of pages in the column chunk with this encoding
+  // These are used to construct the PageEncodingStats, which provide information
+  // about encoding usage for each different page type. Currently, only dictionary
+  // and data pages are used.
+  unordered_map<Encoding::type, int> dict_encoding_stats_;
+  unordered_map<Encoding::type, int> data_encoding_stats_;
+
   // Created, owned, and set by the derived class.
   DictEncoderBase* dict_encoder_base_;
 
@@ -515,6 +531,7 @@ Status HdfsParquetTableWriter::BaseColumnWriter::Flush(int64_t* file_pos,
     DictionaryPageHeader dict_header;
     dict_header.num_values = dict_encoder_base_->num_entries();
     dict_header.encoding = Encoding::PLAIN_DICTIONARY;
+    ++dict_encoding_stats_[dict_header.encoding];
 
     PageHeader header;
     header.type = PageType::DICTIONARY_PAGE;
@@ -598,6 +615,10 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
   PageHeader& header = current_page_->header;
   header.data_page_header.encoding = current_encoding_;
 
+  // Accumulate encoding statistics
+  column_encodings_.insert(header.data_page_header.encoding);
+  ++data_encoding_stats_[header.data_page_header.encoding];
+
   // Compute size of definition bits
   def_levels_->Flush();
   current_page_->num_def_bytes = sizeof(int32_t) + def_levels_->len();
@@ -684,6 +705,9 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
 
     DataPageHeader header;
     header.num_values = 0;
+    // The code that populates the column chunk metadata's encodings field
+    // relies on these specific values for the definition/repetition level
+    // encodings.
     header.definition_level_encoding = Encoding::RLE;
     header.repetition_level_encoding = Encoding::BIT_PACKED;
     current_page_->header.__set_data_page_header(header);
@@ -852,13 +876,6 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   for (int i = 0; i < columns_.size(); ++i) {
     ColumnMetaData metadata;
     metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->expr_ctx_->root()->type().type];
-    // Add all encodings that were used in this file.  Currently we use PLAIN and
-    // PLAIN_DICTIONARY for data values and RLE for the definition levels.
-    metadata.encodings.push_back(Encoding::RLE);
-    // Columns are initially dictionary encoded
-    // TODO: we might not have PLAIN encoding in this case
-    metadata.encodings.push_back(Encoding::PLAIN_DICTIONARY);
-    metadata.encodings.push_back(Encoding::PLAIN);
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
     metadata.codec = columns_[i]->codec();
@@ -1014,30 +1031,55 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     RETURN_IF_ERROR(columns_[i]->Flush(&file_pos_, &data_page_offset, &dict_page_offset));
     DCHECK_GT(data_page_offset, 0);
 
-    current_row_group_->columns[i].meta_data.data_page_offset = data_page_offset;
+    ColumnChunk& col_chunk = current_row_group_->columns[i];
+    ColumnMetaData& col_metadata = col_chunk.meta_data;
+    col_metadata.data_page_offset = data_page_offset;
     if (dict_page_offset >= 0) {
-      current_row_group_->columns[i].meta_data.__set_dictionary_page_offset(
-          dict_page_offset);
+      col_metadata.__set_dictionary_page_offset(dict_page_offset);
     }
 
-    current_row_group_->columns[i].meta_data.num_values = columns_[i]->num_values();
-    current_row_group_->columns[i].meta_data.total_uncompressed_size =
-        columns_[i]->total_uncompressed_size();
-    current_row_group_->columns[i].meta_data.total_compressed_size =
-        columns_[i]->total_compressed_size();
-    current_row_group_->total_byte_size += columns_[i]->total_compressed_size();
-    current_row_group_->num_rows = columns_[i]->num_values();
+    BaseColumnWriter* col_writer = columns_[i].get();
+    col_metadata.num_values = col_writer->num_values();
+    col_metadata.total_uncompressed_size = col_writer->total_uncompressed_size();
+    col_metadata.total_compressed_size = col_writer->total_compressed_size();
+    current_row_group_->total_byte_size += col_writer->total_compressed_size();
+    current_row_group_->num_rows = col_writer->num_values();
     current_row_group_->columns[i].file_offset = file_pos_;
     const string& col_name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    parquet_stats_.per_column_size[col_name] += columns_[i]->total_compressed_size();
+    parquet_stats_.per_column_size[col_name] += col_writer->total_compressed_size();
+
+    // Write encodings and encoding stats for this column
+    col_metadata.encodings.clear();
+    for (Encoding::type encoding : col_writer->column_encodings_) {
+      col_metadata.encodings.push_back(encoding);
+    }
+
+    vector<PageEncodingStats> encoding_stats;
+    // Add dictionary page encoding stats
+    for (const auto& entry: col_writer->dict_encoding_stats_) {
+      PageEncodingStats dict_enc_stat;
+      dict_enc_stat.page_type = PageType::DICTIONARY_PAGE;
+      dict_enc_stat.encoding = entry.first;
+      dict_enc_stat.count = entry.second;
+      encoding_stats.push_back(dict_enc_stat);
+    }
+    // Add data page encoding stats
+    for (const auto& entry: col_writer->data_encoding_stats_) {
+      PageEncodingStats data_enc_stat;
+      data_enc_stat.page_type = PageType::DATA_PAGE;
+      data_enc_stat.encoding = entry.first;
+      data_enc_stat.count = entry.second;
+      encoding_stats.push_back(data_enc_stat);
+    }
+    col_metadata.__set_encoding_stats(encoding_stats);
 
     // Build column statistics and add them to the header.
-    columns_[i]->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
+    col_writer->EncodeRowGroupStats(&current_row_group_->columns[i].meta_data);
 
     // Since we don't supported complex schemas, all columns should have the same
     // number of values.
     DCHECK_EQ(current_row_group_->columns[0].meta_data.num_values,
-        columns_[i]->num_values());
+        col_writer->num_values());
 
     // Metadata for this column is complete, write it out to file.  The column metadata
     // goes at the end so that when we have collocated files, the column data can be
@@ -1049,7 +1091,7 @@ Status HdfsParquetTableWriter::FlushCurrentRowGroup() {
     RETURN_IF_ERROR(Write(buffer, len));
     file_pos_ += len;
 
-    columns_[i]->Reset();
+    col_writer->Reset();
   }
 
   current_row_group_ = nullptr;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 6746ef5..e478b69 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -155,6 +155,16 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
       &min_max_conjunct_ctxs_));
   DCHECK(min_max_conjunct_ctxs_.empty() == (min_max_tuple_id_ == -1));
 
+  for (const auto& entry: tnode.hdfs_scan_node.dictionary_filter_conjuncts) {
+    // Convert this slot's list of conjunct indices into a list of pointers
+    // into conjunct_ctxs_.
+    for (int conjunct_idx : entry.second) {
+      DCHECK_LT(conjunct_idx, conjunct_ctxs_.size());
+      ExprContext* conjunct_ctx = conjunct_ctxs_[conjunct_idx];
+      dict_filter_conjuncts_map_[entry.first].push_back(conjunct_ctx);
+    }
+  }
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index cc31d9b..63d05be 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -160,6 +160,12 @@ class HdfsScanNodeBase : public ScanNode {
   typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
   const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
 
+  /// Slot Id => Dictionary Filter eligible conjuncts for that slot
+  typedef std::map<SlotId, std::vector<ExprContext*>> DictFilterConjunctsMap;
+  const DictFilterConjunctsMap& dict_filter_conjuncts_map() const {
+    return dict_filter_conjuncts_map_;
+  }
+
   RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
     return max_compressed_text_file_length_;
   }
@@ -336,6 +342,9 @@ class HdfsScanNodeBase : public ScanNode {
   /// item tuples). Includes a copy of ExecNode.conjuncts_.
   ConjunctsMap conjuncts_map_;
 
+  /// Dictionary filtering eligible conjuncts for each slot
+  DictFilterConjunctsMap dict_filter_conjuncts_map_;
+
   /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on
   /// the first call to GetNext(). The token manager, in a different thread, will read
   /// this variable.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 281faa6..64a922d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -115,6 +115,12 @@ Status HdfsScanner::Open(ScannerContext* context) {
          scanner_conjuncts_map_.end());
   scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
 
+  // Clone the scan node's dictionary filtering conjuncts map.
+  for (const auto& entry: scan_node_->dict_filter_conjuncts_map()) {
+    RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second,
+        scan_node_->runtime_state(), &scanner_dict_filter_map_[entry.first]));
+  }
+
   // Initialize the template_tuple_.
   template_tuple_ = scan_node_->InitTemplateTuple(
       context_->partition_descriptor()->partition_key_value_ctxs(),
@@ -128,6 +134,7 @@ Status HdfsScanner::Open(ScannerContext* context) {
 void HdfsScanner::Close(RowBatch* row_batch) {
   if (decompressor_.get() != NULL) decompressor_->Close();
   for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_);
+  for (const auto& entry: scanner_dict_filter_map_) Expr::Close(entry.second, state_);
   obj_pool_.Clear();
   stream_ = NULL;
   context_->ClearStreams();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 3ca2744..df21fb2 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -210,6 +210,9 @@ class HdfsScanner {
   // that do not support nested types.
   const std::vector<ExprContext*>* scanner_conjunct_ctxs_;
 
+  // Clones of the conjuncts ExprContexts in scan_node_->dict_filter_conjuncts_map().
+  HdfsScanNodeBase::DictFilterConjunctsMap scanner_dict_filter_map_;
+
   /// Holds memory for template tuples. The memory in this pool must remain valid as long
   /// as the row batches produced by this scanner. This typically means that the
   /// ownership is transferred to the last row batch in Close(). Some scanners transfer

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 535d9be..d9f822c 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -63,8 +63,8 @@ DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in
 
 namespace impala {
 
-const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate "
-    "$1 bytes for $2.";
+const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
+    "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
 
 Status ParquetLevelDecoder::Init(const string& filename,
     parquet::Encoding::type encoding, MemPool* cache_pool, int cache_size,
@@ -253,6 +253,13 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     return ReadValueBatch<false>(pool, max_values, tuple_size, tuple_mem, num_values);
   }
 
+  virtual DictDecoderBase* GetDictionaryDecoder() {
+    return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
+  }
+
+  virtual bool NeedsConversion() { return NeedsConversionInline(); }
+  virtual bool NeedsValidation() { return NeedsValidationInline(); }
+
  protected:
   template<bool IN_COLLECTION>
   inline bool ReadValue(MemPool* pool, Tuple* tuple) {
@@ -452,10 +459,10 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   inline bool ReadSlot(Tuple* tuple, MemPool* pool) {
     void* slot = tuple->GetSlot(tuple_offset_);
     T val;
-    T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot);
+    T* val_ptr = NeedsConversionInline() ? &val : reinterpret_cast<T*>(slot);
     if (IS_DICT_ENCODED) {
       DCHECK_EQ(page_encoding_, parquet::Encoding::PLAIN_DICTIONARY);
-      if (UNLIKELY(!dict_decoder_.GetValue(val_ptr))) {
+      if (UNLIKELY(!dict_decoder_.GetNextValue(val_ptr))) {
         SetDictDecodeError();
         return false;
       }
@@ -470,10 +477,10 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       data_ += encoded_len;
     }
 
-    if (UNLIKELY(NeedsValidation() && !ValidateSlot(val_ptr, tuple))) {
+    if (UNLIKELY(NeedsValidationInline() && !ValidateSlot(val_ptr, tuple))) {
       return false;
     }
-    if (UNLIKELY(NeedsConversion() &&
+    if (UNLIKELY(NeedsConversionInline() &&
         !tuple->IsNull(null_indicator_offset_) &&
         !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) {
       return false;
@@ -484,16 +491,16 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// Most column readers never require conversion, so we can avoid branches by
   /// returning constant false. Column readers for types that require conversion
   /// must specialize this function.
-  inline bool NeedsConversion() const {
+  inline bool NeedsConversionInline() const {
     DCHECK(!needs_conversion_);
     return false;
   }
 
-  /// Similar to NeedsCoversion(), most column readers do not require validation,
+  /// Similar to NeedsConversion(), most column readers do not require validation,
   /// so to avoid branches, we return constant false. In general, types where not
   /// all possible bit representations of the data type are valid should be
   /// validated.
-  inline bool NeedsValidation() const {
+  inline bool NeedsValidationInline() const {
     return false;
   }
 
@@ -535,7 +542,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
 };
 
 template<>
-inline bool ScalarColumnReader<StringValue, true>::NeedsConversion() const {
+inline bool ScalarColumnReader<StringValue, true>::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
@@ -550,7 +557,7 @@ bool ScalarColumnReader<StringValue, true>::ConvertSlot(
   if (slot_desc()->type().IsVarLenStringType()) {
     sv.ptr = reinterpret_cast<char*>(pool->TryAllocate(len));
     if (UNLIKELY(sv.ptr == NULL)) {
-      string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ConvertSlot",
+      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ConvertSlot",
           len, "StringValue");
       parent_->parse_status_ =
           pool->mem_tracker()->MemLimitExceeded(parent_->state_, details, len);
@@ -568,7 +575,7 @@ bool ScalarColumnReader<StringValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversion() const {
+inline bool ScalarColumnReader<TimestampValue, true>::NeedsConversionInline() const {
   return needs_conversion_;
 }
 
@@ -583,7 +590,7 @@ bool ScalarColumnReader<TimestampValue, true>::ConvertSlot(
 }
 
 template<>
-inline bool ScalarColumnReader<TimestampValue, true>::NeedsValidation() const {
+inline bool ScalarColumnReader<TimestampValue, true>::NeedsValidationInline() const {
   return true;
 }
 
@@ -751,10 +758,188 @@ static bool RequiresSkippedDictionaryHeaderCheck(
   return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
 }
 
-Status BaseScalarColumnReader::ReadDataPage() {
-  Status status;
+Status BaseScalarColumnReader::ReadPageHeader(bool peek,
+    parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
+  *eos = false;
+
   uint8_t* buffer;
+  int64_t buffer_size;
+  RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
+  // check for end of stream
+  if (buffer_size == 0) {
+    // The data pages contain fewer values than stated in the column metadata.
+    DCHECK(stream_->eosr());
+    DCHECK_LT(num_values_read_, metadata_->num_values);
+    // TODO for 2.3: node_.element->name isn't necessarily useful
+    ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
+                 metadata_->num_values, num_values_read_, node_.element->name, filename());
+    RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
+    *eos = true;
+    return Status::OK();
+  }
+
+  // We don't know the actual header size until the thrift object is deserialized.  Loop
+  // until we successfully deserialize the header or exceed the maximum header size.
+  uint32_t header_size;
+  Status status;
+  while (true) {
+    header_size = buffer_size;
+    status = DeserializeThriftMsg(buffer, &header_size, true, next_page_header);
+    if (status.ok()) break;
+
+    if (buffer_size >= FLAGS_max_page_header_size) {
+      stringstream ss;
+      ss << "ParquetScanner: could not read data page because page header exceeded "
+         << "maximum size of "
+         << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
+      status.AddDetail(ss.str());
+      return status;
+    }
+
+    // Didn't read entire header, increase buffer size and try again
+    int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
+    bool success = stream_->GetBytes(
+        new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
+    if (!success) {
+      DCHECK(!status.ok());
+      return status;
+    }
+    DCHECK(status.ok());
+
+    // Even though we increased the allowed buffer size, the number of bytes
+    // read did not change. The header is not limited by the buffer space,
+    // so it must be incomplete in the file.
+    if (buffer_size == new_buffer_size) {
+      DCHECK_NE(new_buffer_size, 0);
+      return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
+    }
+    DCHECK_GT(new_buffer_size, buffer_size);
+    buffer_size = new_buffer_size;
+  }
+
+  *next_header_size = header_size;
+
+  // Successfully deserialized current_page_header_
+  if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
+
+  int data_size = next_page_header->compressed_page_size;
+  if (UNLIKELY(data_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
+        "column '$2'", filename(), data_size, schema_element().name));
+  }
+  int uncompressed_size = next_page_header->uncompressed_page_size;
+  if (UNLIKELY(uncompressed_size < 0)) {
+    return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
+        "size $1 for column '$2'", filename(), uncompressed_size,
+        schema_element().name));
+  }
+
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::InitDictionary() {
+  // Peek at the next page header
+  bool eos;
+  parquet::PageHeader next_page_header;
+  uint32_t next_header_size;
+
+  DCHECK(!HasDictionaryDecoder());
+
+  RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
+                                 &next_header_size, &eos));
+  if (eos) return Status::OK();
+  // The dictionary must be the first data page, so if the first page
+  // is not a dictionary, then there is no dictionary.
+  if (next_page_header.type != parquet::PageType::DICTIONARY_PAGE) return Status::OK();
 
+  current_page_header_ = next_page_header;
+  Status status;
+  if (!stream_->SkipBytes(next_header_size, &status)) return status;
+
+  int data_size = current_page_header_.compressed_page_size;
+  if (slot_desc_ == nullptr) {
+    // Skip processing the dictionary page if we don't need to decode any values. In
+    // addition to being unnecessary, we are likely unable to successfully decode the
+    // dictionary values because we don't necessarily create the right type of scalar
+    // reader if there's no slot to read into (see CreateReader()).
+    if (!stream_->SkipBytes(data_size, &status)) return status;
+    return Status::OK();
+  }
+
+  if (node_.element->type == parquet::Type::BOOLEAN) {
+    return Status("Unexpected dictionary page. Dictionary page is not"
+       " supported for booleans.");
+  }
+
+  const parquet::DictionaryPageHeader* dict_header = nullptr;
+  if (current_page_header_.__isset.dictionary_page_header) {
+    dict_header = &current_page_header_.dictionary_page_header;
+  } else {
+    if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
+      return Status("Dictionary page does not have dictionary header set.");
+    }
+  }
+  if (dict_header != nullptr &&
+      dict_header->encoding != parquet::Encoding::PLAIN &&
+      dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
+    return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
+                  "for dictionary pages.");
+  }
+
+  if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
+  data_end_ = data_ + data_size;
+
+  uint8_t* dict_values = nullptr;
+  if (decompressor_.get() != nullptr) {
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
+    dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
+    if (UNLIKELY(dict_values == nullptr)) {
+      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary",
+                                  uncompressed_size, "dictionary");
+      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+               parent_->state_, details, uncompressed_size);
+    }
+    RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
+                    &uncompressed_size, &dict_values));
+    VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
+    if (current_page_header_.uncompressed_page_size != uncompressed_size) {
+      return Status(Substitute("Error decompressing dictionary page in file '$0'. "
+               "Expected $1 uncompressed bytes but got $2", filename(),
+               current_page_header_.uncompressed_page_size, uncompressed_size));
+    }
+    data_size = uncompressed_size;
+  } else {
+    if (current_page_header_.uncompressed_page_size != data_size) {
+      return Status(Substitute("Error reading dictionary page in file '$0'. "
+                               "Expected $1 bytes but got $2", filename(),
+                               current_page_header_.uncompressed_page_size, data_size));
+    }
+    // Copy dictionary from io buffer (which will be recycled as we read
+    // more data) to a new buffer
+    dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
+    if (UNLIKELY(dict_values == nullptr)) {
+      string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "InitDictionary",
+                                  data_size, "dictionary");
+      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+               parent_->state_, details, data_size);
+    }
+    memcpy(dict_values, data_, data_size);
+  }
+
+  DictDecoderBase* dict_decoder;
+  RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder));
+  if (dict_header != nullptr &&
+      dict_header->num_values != dict_decoder->num_entries()) {
+    return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+                  slot_desc_->type().DebugString(),
+                  Substitute("Expected $0 entries but data contained $1 entries",
+                             dict_header->num_values, dict_decoder->num_entries()));
+  }
+
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
   // now complete, pass along the memory allocated for it.
   parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false);
@@ -775,155 +960,30 @@ Status BaseScalarColumnReader::ReadDataPage() {
       return Status::OK();
     }
 
-    int64_t buffer_size;
-    RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
-    if (buffer_size == 0) {
-      // The data pages contain fewer values than stated in the column metadata.
-      DCHECK(stream_->eosr());
-      DCHECK_LT(num_values_read_, metadata_->num_values);
-      // TODO for 2.3: node_.element->name isn't necessarily useful
-      ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
-          metadata_->num_values, num_values_read_, node_.element->name, filename());
-      RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
-      return Status::OK();
-    }
-
-    // We don't know the actual header size until the thrift object is deserialized.  Loop
-    // until we successfully deserialize the header or exceed the maximum header size.
+    bool eos;
     uint32_t header_size;
-    while (true) {
-      header_size = buffer_size;
-      status = DeserializeThriftMsg(
-          buffer, &header_size, true, &current_page_header_);
-      if (status.ok()) break;
-
-      if (buffer_size >= FLAGS_max_page_header_size) {
-        stringstream ss;
-        ss << "ParquetScanner: could not read data page because page header exceeded "
-           << "maximum size of "
-           << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
-        status.AddDetail(ss.str());
-        return status;
-      }
-
-      // Didn't read entire header, increase buffer size and try again
-      Status status;
-      int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
-      bool success = stream_->GetBytes(
-          new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
-      if (!success) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      DCHECK(status.ok());
-
-      if (buffer_size == new_buffer_size) {
-        DCHECK_NE(new_buffer_size, 0);
-        return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
-      }
-      DCHECK_GT(new_buffer_size, buffer_size);
-      buffer_size = new_buffer_size;
-    }
-
-    // Successfully deserialized current_page_header_
-    if (!stream_->SkipBytes(header_size, &status)) return status;
-
-    int data_size = current_page_header_.compressed_page_size;
-    int uncompressed_size = current_page_header_.uncompressed_page_size;
-    if (UNLIKELY(data_size < 0)) {
-      return Status(Substitute("Corrupt Parquet file '$0': negative page size $1 for "
-          "column '$2'", filename(), data_size, schema_element().name));
-    }
-    if (UNLIKELY(uncompressed_size < 0)) {
-      return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed page "
-          "size $1 for column '$2'", filename(), uncompressed_size,
-          schema_element().name));
-    }
+    RETURN_IF_ERROR(ReadPageHeader(false /* peek */, &current_page_header_,
+                                   &header_size, &eos));
+    if (eos) return Status::OK();
 
     if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
-      if (slot_desc_ == NULL) {
-        // Skip processing the dictionary page if we don't need to decode any values. In
-        // addition to being unnecessary, we are likely unable to successfully decode the
-        // dictionary values because we don't necessarily create the right type of scalar
-        // reader if there's no slot to read into (see CreateReader()).
-        if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-        continue;
-      }
-
+      // Any dictionary is already initialized, as InitDictionary has already
+      // been called. There are two possibilities:
+      // 1. The parquet file has two dictionary pages
+      // OR
+      // 2. The parquet file does not have the dictionary as the first data page.
+      // Both are errors in the parquet file.
       if (HasDictionaryDecoder()) {
-        return Status("Column chunk should not contain two dictionary pages.");
-      }
-      if (node_.element->type == parquet::Type::BOOLEAN) {
-        return Status("Unexpected dictionary page. Dictionary page is not"
-            " supported for booleans.");
-      }
-      const parquet::DictionaryPageHeader* dict_header = NULL;
-      if (current_page_header_.__isset.dictionary_page_header) {
-        dict_header = &current_page_header_.dictionary_page_header;
-      } else {
-        if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
-          return Status("Dictionary page does not have dictionary header set.");
-        }
-      }
-      if (dict_header != NULL &&
-          dict_header->encoding != parquet::Encoding::PLAIN &&
-          dict_header->encoding != parquet::Encoding::PLAIN_DICTIONARY) {
-        return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
-            "for dictionary pages.");
-      }
-
-      if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
-      data_end_ = data_ + data_size;
-
-      uint8_t* dict_values = NULL;
-      if (decompressor_.get() != NULL) {
-        dict_values = parent_->dictionary_pool_->TryAllocate(uncompressed_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-              uncompressed_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, uncompressed_size);
-        }
-        RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
-            &uncompressed_size, &dict_values));
-        VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
-        if (current_page_header_.uncompressed_page_size != uncompressed_size) {
-          return Status(Substitute("Error decompressing dictionary page in file '$0'. "
-              "Expected $1 uncompressed bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, uncompressed_size));
-        }
-        data_size = uncompressed_size;
+        return Status(Substitute("Corrupt Parquet file '$0': multiple dictionary pages "
+            "for column '$1'", filename(), schema_element().name));
       } else {
-        if (current_page_header_.uncompressed_page_size != data_size) {
-          return Status(Substitute("Error reading dictionary page in file '$0'. "
-              "Expected $1 bytes but got $2", filename(),
-              current_page_header_.uncompressed_page_size, data_size));
-        }
-        // Copy dictionary from io buffer (which will be recycled as we read
-        // more data) to a new buffer
-        dict_values = parent_->dictionary_pool_->TryAllocate(data_size);
-        if (UNLIKELY(dict_values == NULL)) {
-          string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-              data_size, "dictionary");
-          return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
-              parent_->state_, details, data_size);
-        }
-        memcpy(dict_values, data_, data_size);
+        return Status(Substitute("Corrupt Parquet file: '$0': dictionary page for "
+            "column '$1' is not the first page", filename(), schema_element().name));
       }
-
-      DictDecoderBase* dict_decoder;
-      RETURN_IF_ERROR(CreateDictionaryDecoder(dict_values, data_size, &dict_decoder));
-      if (dict_header != NULL &&
-          dict_header->num_values != dict_decoder->num_entries()) {
-        return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
-            slot_desc_->type().DebugString(),
-            Substitute("Expected $0 entries but data contained $1 entries",
-            dict_header->num_values, dict_decoder->num_entries()));
-      }
-      // Done with dictionary page, read next page
-      continue;
     }
 
+    Status status;
+    int data_size = current_page_header_.compressed_page_size;
     if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
       // We can safely skip non-data pages
       if (!stream_->SkipBytes(data_size, &status)) return status;
@@ -943,12 +1003,13 @@ Status BaseScalarColumnReader::ReadDataPage() {
     num_buffered_values_ = num_values;
     num_values_read_ += num_buffered_values_;
 
+    int uncompressed_size = current_page_header_.uncompressed_page_size;
     if (decompressor_.get() != NULL) {
       SCOPED_TIMER(parent_->decompress_timer_);
       uint8_t* decompressed_buffer =
           decompressed_data_pool_->TryAllocate(uncompressed_size);
       if (UNLIKELY(decompressed_buffer == NULL)) {
-        string details = Substitute(PARQUET_MEM_LIMIT_EXCEEDED, "ReadDataPage",
+        string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage",
             uncompressed_size, "decompressed data");
         return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
             parent_->state_, details, uncompressed_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 5d68a24..9de4277 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -363,6 +363,24 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// next data page if necessary.
   virtual bool NextLevels() { return NextLevels<true>(); }
 
+  // Check the data stream to see if there is a dictionary page. If there is,
+  // use that page to initialize dict_decoder_ and advance the data stream
+  // past the dictionary page.
+  Status InitDictionary();
+
+  // Returns the dictionary or NULL if the dictionary doesn't exist
+  virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
+
+  // Returns whether the datatype for this column requires conversion from the on-disk
+  // format for correctness. For example, timestamps can require an offset to be
+  // applied.
+  virtual bool NeedsConversion() { return false; }
+
+  // Returns whether the datatype for this column requires validation. For example,
+  // the timestamp format has certain bit combinations that are invalid, and these
+  // need to be validated when read from disk.
+  virtual bool NeedsValidation() { return false; }
+
   // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) if
   // we know this row can be skipped. This could be very useful with stats and big
   // sections can be skipped. Implement that when we can benefit from it.
@@ -408,6 +426,14 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Header for current data page.
   parquet::PageHeader current_page_header_;
 
+  /// Reads the next page header into next_page_header/next_header_size.
+  /// If the stream reaches the end before reading a complete page header,
+  /// eos is set to true. If peek is false, the stream position is advanced
+  /// past the page header. If peek is true, the stream position is not moved.
+  /// Returns an error status if the next page header could not be read.
+  Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header,
+      uint32_t* next_header_size, bool* eos);
+
   /// Read the next data page. If a dictionary page is encountered, that will be read and
   /// this function will continue reading the next data page.
   Status ReadDataPage();
@@ -426,8 +452,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
       DictDecoderBase** decoder) = 0;
 
-  /// Return true if the column has an initialized dictionary decoder. Subclass must
-  /// implement this.
+  /// Return true if the column has a dictionary decoder. Subclass must implement this.
   virtual bool HasDictionaryDecoder() = 0;
 
   /// Clear the dictionary decoder so HasDictionaryDecoder() will return false. Subclass

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 7227387..494d98e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -450,6 +450,11 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_decimal_v2(iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::PARQUET_DICTIONARY_FILTERING: {
+        query_options->__set_parquet_dictionary_filtering(
+            iequals(value, "true") || iequals(value, "1"));
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index d4fcd96..f5220ea 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DECIMAL_V2 + 1);\
+      TImpalaQueryOptions::PARQUET_DICTIONARY_FILTERING + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -88,6 +88,7 @@ class TQueryOptions;
   QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT)\
   QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES)\
   QUERY_OPT_FN(decimal_v2, DECIMAL_V2)\
+  QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING)\
   ;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/be/src/util/dict-encoding.h b/be/src/util/dict-encoding.h
index d9fbe08..e791d95 100644
--- a/be/src/util/dict-encoding.h
+++ b/be/src/util/dict-encoding.h
@@ -187,6 +187,10 @@ class DictDecoderBase {
 
   virtual int num_entries() const = 0;
 
+  /// Reads the dictionary value at the specified index into the buffer provided.
+  /// The buffer must be large enough to receive the datatype for this dictionary.
+  virtual void GetValue(int index, void* buffer) = 0;
+
  protected:
   RleDecoder data_decoder_;
 };
@@ -209,10 +213,18 @@ class DictDecoder : public DictDecoderBase {
 
   virtual int num_entries() const { return dict_.size(); }
 
+  virtual void GetValue(int index, void* buffer) {
+    T* val_ptr = reinterpret_cast<T*>(buffer);
+    DCHECK_GE(index, 0);
+    DCHECK_LT(index, dict_.size());
+    // TODO: is there any circumstance where this should be a memcpy?
+    *val_ptr = dict_[index];
+  }
+
   /// Returns the next value.  Returns false if the data is invalid.
   /// For StringValues, this does not make a copy of the data.  Instead,
   /// the string data is from the dictionary buffer passed into the c'tor.
-  bool GetValue(T* value);
+  bool GetNextValue(T* value);
 
  private:
   std::vector<T> dict_;
@@ -274,7 +286,7 @@ inline int DictEncoder<StringValue>::AddToTable(const StringValue& value,
 }
 
 template<typename T>
-inline bool DictDecoder<T>::GetValue(T* value) {
+inline bool DictDecoder<T>::GetNextValue(T* value) {
   int index = -1; // Initialize to avoid compiler warning.
   bool result = data_decoder_.Get(&index);
   // Use & to avoid branches.
@@ -286,7 +298,7 @@ inline bool DictDecoder<T>::GetValue(T* value) {
 }
 
 template<>
-inline bool DictDecoder<Decimal16Value>::GetValue(Decimal16Value* value) {
+inline bool DictDecoder<Decimal16Value>::GetNextValue(Decimal16Value* value) {
   int index;
   bool result = data_decoder_.Get(&index);
   if (!result) return false;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/be/src/util/dict-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/dict-test.cc b/be/src/util/dict-test.cc
index a14922c..686bbce 100644
--- a/be/src/util/dict-test.cc
+++ b/be/src/util/dict-test.cc
@@ -30,7 +30,8 @@
 namespace impala {
 
 template<typename T>
-void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size) {
+void ValidateDict(const vector<T>& values, const vector<T>& dict_values,
+                  int fixed_buffer_byte_size) {
   set<T> values_set(values.begin(), values.end());
 
   MemTracker tracker;
@@ -51,10 +52,20 @@ void ValidateDict(const vector<T>& values, int fixed_buffer_byte_size) {
   DictDecoder<T> decoder;
   ASSERT_TRUE(
       decoder.Reset(dict_buffer, encoder.dict_encoded_size(), fixed_buffer_byte_size));
+
+  // Test direct access to the dictionary via indexes
+  for (int i = 0; i < dict_values.size(); ++i) {
+    T expected_value = dict_values[i];
+    T out_value;
+
+    decoder.GetValue(i, &out_value);
+    EXPECT_EQ(expected_value, out_value);
+  }
+  // Test access to dictionary via internal stream
   ASSERT_OK(decoder.SetData(data_buffer, data_len));
   for (T i: values) {
     T j;
-    decoder.GetValue(&j);
+    decoder.GetNextValue(&j);
     EXPECT_EQ(i, j);
   }
   pool.FreeAll();
@@ -66,6 +77,12 @@ TEST(DictTest, TestStrings) {
   StringValue sv3("bar");
   StringValue sv4("baz");
 
+  vector<StringValue> dict_values;
+  dict_values.push_back(sv1);
+  dict_values.push_back(sv2);
+  dict_values.push_back(sv3);
+  dict_values.push_back(sv4);
+
   vector<StringValue> values;
   values.push_back(sv1);
   values.push_back(sv1);
@@ -79,7 +96,7 @@ TEST(DictTest, TestStrings) {
   values.push_back(sv3);
   values.push_back(sv4);
 
-  ValidateDict(values, -1);
+  ValidateDict(values, dict_values, -1);
 }
 
 TEST(DictTest, TestTimestamps) {
@@ -87,6 +104,11 @@ TEST(DictTest, TestTimestamps) {
   TimestampValue tv2("2012-01-01 09:01:01", 19);
   TimestampValue tv3("2011-01-01 09:01:02", 19);
 
+  vector<TimestampValue> dict_values;
+  dict_values.push_back(tv1);
+  dict_values.push_back(tv2);
+  dict_values.push_back(tv3);
+
   vector<TimestampValue> values;
   values.push_back(tv1);
   values.push_back(tv2);
@@ -95,7 +117,8 @@ TEST(DictTest, TestTimestamps) {
   values.push_back(tv1);
   values.push_back(tv1);
 
-  ValidateDict(values, ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
+  ValidateDict(values, dict_values,
+      ParquetPlainEncoder::EncodedByteSize(ColumnType(TYPE_TIMESTAMP)));
 }
 
 template<typename T>
@@ -108,12 +131,15 @@ template <> void IncrementValue(Decimal16Value* t) { ++(t->value()); }
 template<typename T>
 void TestNumbers(int max_value, int repeat, int value_byte_size) {
   vector<T> values;
+  vector<T> dict_values;
   for (T val = 0; val < max_value; IncrementValue(&val)) {
     for (int i = 0; i < repeat; ++i) {
       values.push_back(val);
     }
+    dict_values.push_back(val);
   }
-  ValidateDict(values, value_byte_size);
+
+  ValidateDict(values, dict_values, value_byte_size);
 }
 
 template<typename T>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 63cb11a..ddf137a 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -221,6 +221,9 @@ struct TQueryOptions {
 
   // Indicates whether to use the new decimal semantics.
   52: optional bool decimal_v2 = false
+
+  // Indicates whether to use dictionary filtering for Parquet files
+  53: optional bool parquet_dictionary_filtering = true
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 98c671d..28ee3df 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -251,6 +251,9 @@ enum TImpalaQueryOptions {
   // Indicates whether to use the new decimal semantics, which includes better
   // rounding and output types for multiply / divide
   DECIMAL_V2,
+
+  // Indicates whether to use dictionary filtering for Parquet files
+  PARQUET_DICTIONARY_FILTERING,
 }
 
 // The summary of a DML statement.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/common/thrift/PlanNodes.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 7de9d2b..25e693c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -211,6 +211,10 @@ struct THdfsScanNode {
 
   // Tuple to evaluate 'min_max_conjuncts' against.
   8: optional Types.TTupleId min_max_tuple_id
+
+  // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible
+  // for dictionary filtering.
+  9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 }
 
 struct TDataSourceScanNode {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/common/thrift/parquet.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/parquet.thrift b/common/thrift/parquet.thrift
index fedebe3..81073f4 100644
--- a/common/thrift/parquet.thrift
+++ b/common/thrift/parquet.thrift
@@ -76,6 +76,89 @@ enum ConvertedType {
    * 2 digits over).
    */
   DECIMAL = 5;
+
+  /**
+   * A Date
+   *
+   * Stored as days since Unix epoch, encoded as the INT32 physical type.
+   *
+   */
+  DATE = 6;
+
+  /**
+   * A time
+   *
+   * The total number of milliseconds since midnight.  The value is stored
+   * as an INT32 physical type.
+   */
+  TIME_MILLIS = 7;
+  // RESERVED = 8;
+
+  /**
+   * A date/time combination
+   *
+   * Date and time recorded as milliseconds since the Unix epoch.  Recorded as
+   * a physical type of INT64.
+   */
+  TIMESTAMP_MILLIS = 9;
+  // RESERVED = 10;
+
+
+  /**
+   * An unsigned integer value.
+   *
+   * The number describes the maximum number of meainful data bits in
+   * the stored value. 8, 16 and 32 bit values are stored using the
+   * INT32 physical type.  64 bit values are stored using the INT64
+   * physical type.
+   *
+   */
+  UINT_8 = 11;
+  UINT_16 = 12;
+  UINT_32 = 13;
+  UINT_64 = 14;
+
+  /**
+   * A signed integer value.
+   *
+   * The number describes the maximum number of meainful data bits in
+   * the stored value. 8, 16 and 32 bit values are stored using the
+   * INT32 physical type.  64 bit values are stored using the INT64
+   * physical type.
+   *
+   */
+  INT_8 = 15;
+  INT_16 = 16;
+  INT_32 = 17;
+  INT_64 = 18;
+
+  /**
+   * An embedded JSON document
+   *
+   * A JSON document embedded within a single UTF8 column.
+   */
+  JSON = 19;
+
+  /**
+   * An embedded BSON document
+   *
+   * A BSON document embedded within a single BINARY column.
+   */
+  BSON = 20;
+
+  /**
+   * An interval of time
+   *
+   * This type annotates data stored as a FIXED_LEN_BYTE_ARRAY of length 12
+   * This data is composed of three separate little endian unsigned
+   * integers.  Each stores a component of a duration of time.  The first
+   * integer identifies the number of months associated with the duration,
+   * the second identifies the number of days associated with the duration
+   * and the third identifies the number of milliseconds associated with
+   * the provided duration.  This duration of time is independent of any
+   * particular timezone or date.
+   */
+  INTERVAL = 21;
 }
 
 /**
@@ -345,6 +428,21 @@ struct SortingColumn {
 }
 
 /**
+ * statistics of a given page type and encoding
+ */
+struct PageEncodingStats {
+
+  /** the page type (data/dic/...) **/
+  1: required PageType page_type;
+
+  /** encoding of the page **/
+  2: required Encoding encoding;
+
+  /** number of pages of this type with this encoding **/
+  3: required i32 count;
+}
+
+/**
  * Description for column metadata
  */
 struct ColumnMetaData {
@@ -384,6 +482,11 @@ struct ColumnMetaData {
 
   /** optional statistics for this column chunk */
   12: optional Statistics statistics;
+
+  /** Set of all encodings used for pages in this column chunk.
+   * This information can be used to determine if all data pages are
+   * dictionary encoded for example **/
+  13: optional list<PageEncodingStats> encoding_stats;
 }
 
 struct ColumnChunk {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 8f01005..1a1c92b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -163,6 +163,16 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
         public boolean apply(Expr arg) { return arg instanceof BinaryPredicate; }
       };
 
+  public final static com.google.common.base.Predicate<Expr>
+      IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof FunctionCallExpr &&
+             !((FunctionCallExpr)arg).isNondeterministicBuiltinFn();
+        }
+      };
+
   // id that's unique across the entire query statement and is assigned by
   // Analyzer.registerConjuncts(); only assigned for the top-level terms of a
   // conjunction, and therefore null for most Exprs

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index a747af8..c9d098d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -227,6 +227,26 @@ public class FunctionCallExpr extends Expr {
   public void setIsAnalyticFnCall(boolean v) { isAnalyticFnCall_ = v; }
   public void setIsInternalFnCall(boolean v) { isInternalFnCall_ = v; }
 
+  static boolean isNondeterministicBuiltinFnName(String fnName) {
+    if (fnName.equalsIgnoreCase("rand") || fnName.equalsIgnoreCase("random")
+        || fnName.equalsIgnoreCase("uuid")) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Returns true if function is a non-deterministic builtin function, i.e. for a fixed
+   * input, it may not always produce the same output for every invocation.
+   * Functions that use randomness or variable runtime state are non-deterministic.
+   * This only applies to builtin functions, and does not provide any information
+   * about user defined functions.
+   */
+  public boolean isNondeterministicBuiltinFn() {
+    String fnName = fnName_.getFunction();
+    return isNondeterministicBuiltinFnName(fnName);
+  }
+
   @Override
   protected void toThrift(TExprNode msg) {
     if (isAggregateFunction() || isAnalyticFnCall_) {
@@ -256,8 +276,7 @@ public class FunctionCallExpr extends Expr {
       fnName = path.get(path.size() - 1);
     }
     // Non-deterministic functions are never constant.
-    if (fnName.equalsIgnoreCase("rand") || fnName.equalsIgnoreCase("random")
-        || fnName.equalsIgnoreCase("uuid")) {
+    if (!isNondeterministicBuiltinFnName(fnName)) {
       return false;
     }
     // Sleep is a special function for testing.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index b1cd86a..50d6bfe 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -19,6 +19,7 @@ package org.apache.impala.planner;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +32,9 @@ import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.BinaryPredicate;
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.FunctionCallExpr;
 import org.apache.impala.analysis.SlotDescriptor;
+import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
 import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
@@ -69,6 +72,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -128,6 +132,11 @@ public class HdfsScanNode extends ScanNode {
   private final Map<TupleDescriptor, List<Expr>> collectionConjuncts_ =
       Maps.newLinkedHashMap();
 
+  // Map from SlotIds to indices in PlanNodes.conjuncts_ that are eligible for
+  // dictionary filtering
+  private Map<Integer, List<Integer>> dictionaryFilterConjuncts_ =
+      Maps.newLinkedHashMap();
+
   // Indicates corrupt table stats based on the number of non-empty scan ranges and
   // numRows set to 0. Set in computeStats().
   private boolean hasCorruptTableStats_;
@@ -202,7 +211,7 @@ public class HdfsScanNode extends ScanNode {
     checkForSupportedFileFormats();
 
     assignCollectionConjuncts(analyzer);
-
+    computeDictionaryFilterConjuncts(analyzer);
     computeMemLayout(analyzer);
 
     // compute scan range locations
@@ -415,6 +424,45 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
+   * Walks through conjuncts and populates dictionaryFilterConjuncts_.
+   */
+  private void computeDictionaryFilterConjuncts(Analyzer analyzer) {
+    for (int conjunct_idx = 0; conjunct_idx < conjuncts_.size(); ++conjunct_idx) {
+      Expr conjunct = conjuncts_.get(conjunct_idx);
+      List<TupleId> tupleIds = Lists.newArrayList();
+      List<SlotId> slotIds = Lists.newArrayList();
+
+      conjunct.getIds(tupleIds, slotIds);
+      Preconditions.checkState(tupleIds.size() == 1);
+      if (slotIds.size() != 1) continue;
+
+      // Check to see if this slot is a collection type. Nested types are
+      // currently not supported. For example, an IsNotEmptyPredicate cannot
+      // be evaluated at the dictionary level.
+      if (analyzer.getSlotDesc(slotIds.get(0)).getType().isCollectionType()) continue;
+
+      // Check to see if this conjunct contains any known randomized function
+      if (conjunct.contains(Expr.IS_NONDETERMINISTIC_BUILTIN_FN_PREDICATE)) continue;
+
+      // Check to see if the conjunct evaluates to true when the slot is NULL
+      // This is important for dictionary filtering. Dictionaries do not
+      // contain an entry for NULL and do not provide an indication about
+      // whether NULLs are present. A conjunct that evaluates to true on NULL
+      // cannot be evaluated purely on the dictionary.
+      if (analyzer.isTrueWithNullSlots(conjunct)) continue;
+
+      // TODO: Should there be a limit on the cost/structure of the conjunct?
+      Integer slotIdInt = slotIds.get(0).asInt();
+      if (dictionaryFilterConjuncts_.containsKey(slotIdInt)) {
+        dictionaryFilterConjuncts_.get(slotIdInt).add(conjunct_idx);
+      } else {
+        List<Integer> slotList = Lists.newArrayList(conjunct_idx);
+        dictionaryFilterConjuncts_.put(slotIdInt, slotList);
+      }
+    }
+  }
+
+  /**
    * Computes scan ranges (hdfs splits) plus their storage locations, including volume
    * ids, based on the given maximum number of bytes each scan range should scan.
    * Returns the set of file formats being scanned.
@@ -673,6 +721,7 @@ public class HdfsScanNode extends ScanNode {
       }
       msg.hdfs_scan_node.setMin_max_tuple_id(minMaxTuple_.getId().asInt());
     }
+    msg.hdfs_scan_node.setDictionary_filter_conjuncts(dictionaryFilterConjuncts_);
   }
 
   @Override
@@ -738,6 +787,20 @@ public class HdfsScanNode extends ScanNode {
         output.append(detailPrefix + "parquet statistics predicates: " +
             getExplainString(minMaxOriginalConjuncts_) + "\n");
       }
+      if (!dictionaryFilterConjuncts_.isEmpty()) {
+        List<Integer> totalIdxList = Lists.newArrayList();
+        for (List<Integer> idxList : dictionaryFilterConjuncts_.values()) {
+          totalIdxList.addAll(idxList);
+        }
+        // Since the conjuncts are stored by the slot id, they are not necessarily
+        // in the same order as the normal conjuncts. Sort the indices so that the
+        // order matches the normal conjuncts.
+        Collections.sort(totalIdxList);
+        List<Expr> exprList = Lists.newArrayList();
+        for (Integer idx : totalIdxList) exprList.add(conjuncts_.get(idx));
+        output.append(String.format("%sparquet dictionary predicates: %s\n",
+            detailPrefix, getExplainString(exprList)));
+      }
     }
     return output.toString();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 42e50be..5148f68 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -266,6 +266,13 @@ public class PlannerTest extends PlannerTestBase {
   }
 
   @Test
+  public void testParquetFiltering() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    runPlannerTestFile("parquet-filtering", options);
+  }
+
+  @Test
   public void testKudu() {
     Assume.assumeTrue(RuntimeEnv.INSTANCE.isKuduSupported());
     runPlannerTestFile("kudu");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index d14c462..3a9855d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -50,6 +50,7 @@ PLAN-ROOT SINK
    table stats: 150000 rows total
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey > 10
+   parquet dictionary predicates: c_custkey > 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=24B cardinality=15000
 ====
@@ -127,6 +128,7 @@ PLAN-ROOT SINK
 |     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     table stats: 7300 rows total
 |     column stats: all
+|     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     hosts=3 per-host-mem=unavailable
 |     tuple-ids=1 row-size=20B cardinality=730
 |
@@ -158,6 +160,7 @@ PLAN-ROOT SINK
 |     predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     table stats: 7300 rows total
 |     column stats: all
+|     parquet dictionary predicates: CAST(b.double_col AS DECIMAL(3,2)) > 11.1
 |     hosts=3 per-host-mem=unavailable
 |     tuple-ids=1 row-size=20B cardinality=730
 |

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9b923a1a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index 5547d3a..4d9544d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -58,6 +58,7 @@ PLAN-ROOT SINK
    table stats: unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
+   parquet dictionary predicates: id < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=16B cardinality=unavailable
 ---- PARALLELPLANS
@@ -96,6 +97,7 @@ PLAN-ROOT SINK
    table stats: unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
+   parquet dictionary predicates: id < 10
    hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=16B cardinality=unavailable
 ====
@@ -125,6 +127,7 @@ PLAN-ROOT SINK
    table stats: unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
+   parquet dictionary predicates: id < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=8B cardinality=unavailable
 ---- PARALLELPLANS
@@ -157,6 +160,7 @@ PLAN-ROOT SINK
    table stats: unavailable
    column stats: unavailable
    parquet statistics predicates: id < 10
+   parquet dictionary predicates: id < 10
    hosts=3 per-host-mem=16.00MB
    tuple-ids=0 row-size=8B cardinality=unavailable
 ====
@@ -211,6 +215,7 @@ PLAN-ROOT SINK
    table stats: 150000 rows total
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
+   parquet dictionary predicates: c_custkey < 10
    hosts=3 per-host-mem=unavailable
    tuple-ids=0 row-size=254B cardinality=15000
 ---- PARALLELPLANS
@@ -264,6 +269,7 @@ PLAN-ROOT SINK
    table stats: 150000 rows total
    columns missing stats: c_orders
    parquet statistics predicates: c_custkey < 10
+   parquet dictionary predicates: c_custkey < 10
    hosts=3 per-host-mem=88.00MB
    tuple-ids=0 row-size=254B cardinality=15000
 ====