You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/10/29 10:37:05 UTC

[impala] 02/02: IMPALA-9873: Avoid materialization of columns for filtered out rows in Parquet table.

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit cd64271a0c4df6906d036a5a831001fdc8000285
Author: Amogh Margoor <am...@cloudera.com>
AuthorDate: Mon Aug 16 12:13:09 2021 +0100

    IMPALA-9873: Avoid materialization of columns for filtered out rows in Parquet table.
    
    Currently, entire row is materialized before filtering during scan.
    Instead of paying the cost of materializing upfront, for columnar
    formats we can avoid doing it for rows that are filtered out.
    Columns that are required for filtering are the only ones that are
    needed to be materialized before filtering. For rest of the columns,
    materialization can be delayed and be done only for rows that survive.
    This patch implements this technique for Parquet format only.
    
    New configuration 'parquet_materialization_threshold' is introduced,
    which is minimum number of consecutive rows that are filtered out
    to avoid materialization. If set to less than 0, it disables the
    late materialization.
    
    Performance:
    Peformance measured for single daemon, single threaded impalad
    upon TPCH scale 42 lineitem table with 252 million rows,
    unsorted data. Upto 2.5x improvement for non-page indexed and
    upto 4x improvement in page index seen. Queries for page index
    borrowed from blog:
    https://blog.cloudera.com/speeding-up-select-queries-with-parquet-page-indexes/
    More details:
    https://docs.google.com/spreadsheets/d/17s5OLaFOPo-64kimAPP6n3kJA42vM-iVT24OvsQgfuA/edit?usp=sharing
    
    Testing:
     1. Ran existing tests
     2. Added UT for 'ScratchTupleBatch::GetMicroBatch'
     3. Added end-to-end test for late materialization.
    Change-Id: I46406c913297d5bbbec3ccae62a83bb214ed2c60
    Reviewed-on: http://gerrit.cloudera.org:8080/17860
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Qifan Chen <qc...@cloudera.com>
---
 be/src/exec/CMakeLists.txt                         |   1 +
 be/src/exec/hdfs-columnar-scanner-ir.cc            |   4 +
 be/src/exec/hdfs-columnar-scanner.cc               |  11 +-
 be/src/exec/hdfs-columnar-scanner.h                |   5 +
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 269 ++++++++++++++++++++-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  60 ++++-
 .../parquet/parquet-collection-column-reader.cc    |   9 +
 .../parquet/parquet-collection-column-reader.h     |  19 ++
 be/src/exec/parquet/parquet-column-chunk-reader.cc |  11 +-
 be/src/exec/parquet/parquet-column-chunk-reader.h  |  27 ++-
 be/src/exec/parquet/parquet-column-readers.cc      | 250 ++++++++++++++++++-
 be/src/exec/parquet/parquet-column-readers.h       |  95 +++++++-
 be/src/exec/scratch-tuple-batch-test.cc            | 188 ++++++++++++++
 be/src/exec/scratch-tuple-batch.h                  |  59 ++++-
 be/src/service/query-options.cc                    |  14 +-
 be/src/service/query-options.h                     |   6 +-
 be/src/util/tuple-row-compare.h                    |   2 +-
 common/thrift/ImpalaService.thrift                 |   4 +
 common/thrift/Query.thrift                         |   4 +
 ...out-of-range-timestamp-local-tz-conversion.test |   2 +-
 .../QueryTest/parquet-late-materialization.test    |  45 ++++
 .../test_parquet_late_materialization.py           |  37 +++
 22 files changed, 1070 insertions(+), 52 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 9bf07bc..87d3b1a 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -128,5 +128,6 @@ ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*)
 ADD_UNIFIED_BE_LSAN_TEST(read-write-util-test ReadWriteUtil.*)
 # Exception to unified be tests: Custom main with global Frontend object
 ADD_BE_LSAN_TEST(row-batch-list-test)
+ADD_BE_LSAN_TEST(scratch-tuple-batch-test)
 ADD_UNIFIED_BE_LSAN_TEST(incr-stats-util-test IncrStatsUtilTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(hdfs-avro-scanner-test HdfsAvroScannerTest.*)
diff --git a/be/src/exec/hdfs-columnar-scanner-ir.cc b/be/src/exec/hdfs-columnar-scanner-ir.cc
index a9bd48f..d2caa5a 100644
--- a/be/src/exec/hdfs-columnar-scanner-ir.cc
+++ b/be/src/exec/hdfs-columnar-scanner-ir.cc
@@ -43,19 +43,23 @@ int HdfsColumnarScanner::ProcessScratchBatch(RowBatch* dst_batch) {
   // Loop until the scratch batch is exhausted or the output batch is full.
   // Do not use batch_->AtCapacity() in this loop because it is not necessary
   // to perform the memory capacity check.
+  bool* is_selected = scratch_batch_->selected_rows.get() + scratch_batch_->tuple_idx;
   while (scratch_tuple != scratch_tuple_end) {
     *output_row = reinterpret_cast<Tuple*>(scratch_tuple);
     scratch_tuple += tuple_size;
     // Evaluate runtime filters and conjuncts. Short-circuit the evaluation if
     // the filters/conjuncts are empty to avoid function calls.
     if (!EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) {
+      *is_selected++ = false;
       continue;
     }
     if (!ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts,
         reinterpret_cast<TupleRow*>(output_row))) {
+      *is_selected++ = false;
       continue;
     }
     // Row survived runtime filters and conjuncts.
+    *is_selected++ = true;
     ++output_row;
     if (output_row == output_row_end) break;
   }
diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc
index de6122b..708c56a 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -39,7 +39,7 @@ HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
 
 HdfsColumnarScanner::~HdfsColumnarScanner() {}
 
-int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) {
+int HdfsColumnarScanner::FilterScratchBatch(RowBatch* dst_batch) {
   // This function must not be called when the output batch is already full. As long as
   // we always call CommitRows() after TransferScratchTuples(), the output batch can
   // never be empty.
@@ -62,9 +62,14 @@ int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) {
     DCHECK_EQ(0, scratch_batch_->total_allocated_bytes());
     return num_tuples;
   }
+  return ProcessScratchBatchCodegenOrInterpret(dst_batch);
+}
 
-  const int num_rows_to_commit = ProcessScratchBatchCodegenOrInterpret(dst_batch);
-  scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
+int HdfsColumnarScanner::TransferScratchTuples(RowBatch* dst_batch) {
+  const int num_rows_to_commit = FilterScratchBatch(dst_batch);
+  if (scratch_batch_->tuple_byte_size != 0) {
+    scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
+  }
   return num_rows_to_commit;
 }
 
diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h
index 1faa626..56805e4 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -54,6 +54,11 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// Function type: ProcessScratchBatchFn
   const CodegenFnPtrBase* codegend_process_scratch_batch_fn_ = nullptr;
 
+  /// Filters out tuples from 'scratch_batch_' and adds the surviving tuples
+  /// to the given batch. Finalizing transfer of batch is not done here.
+  /// Returns the number of tuples that should be committed to the given batch.
+  int FilterScratchBatch(RowBatch* row_batch);
+
   /// Evaluates runtime filters and conjuncts (if any) against the tuples in
   /// 'scratch_batch_', and adds the surviving tuples to the given batch.
   /// Transfers the ownership of tuple memory to the target batch when the
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index b1f0fdb..4068aab 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -110,8 +110,11 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState
     parquet_compressed_page_size_counter_(nullptr),
     parquet_uncompressed_page_size_counter_(nullptr),
     coll_items_read_counter_(0),
-    page_index_(this) {
+    page_index_(this),
+    late_materialization_threshold_(
+      state->query_options().parquet_late_materialization_threshold) {
   assemble_rows_timer_.Stop();
+  complete_micro_batch_ = {0, state_->batch_size() - 1, state_->batch_size()};
 }
 
 Status HdfsParquetScanner::Open(ScannerContext* context) {
@@ -141,6 +144,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRuntimeFilteredPages", TUnit::UNIT);
   num_pages_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumPages", TUnit::UNIT);
+  num_pages_skipped_by_late_materialization_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumPagesSkippedByLateMaterialization",
+          TUnit::UNIT);
   num_scanners_with_no_reads_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", TUnit::UNIT);
   num_dict_filtered_row_groups_counter_ =
@@ -226,6 +232,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   // We've processed the metadata and there are columns that need to be materialized.
   RETURN_IF_ERROR(CreateColumnReaders(
       *scan_node_->tuple_desc(), *schema_resolver_, &column_readers_));
+  InitSlotIdsForConjuncts();
   COUNTER_SET(num_cols_counter_,
       static_cast<int64_t>(CountScalarColumns(column_readers_)));
   // Set top-level template tuple.
@@ -235,9 +242,50 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   if (state_->query_options().parquet_bloom_filtering) {
     RETURN_IF_ERROR(CreateColIdx2EqConjunctMap());
   }
+  DivideFilterAndNonFilterColumnReaders(column_readers_, &filter_readers_,
+      &non_filter_readers_);
   return Status::OK();
 }
 
+// Currently, Collection Readers and scalar readers upon collection values
+// are not supported for late materialization.
+static bool DoesNotSupportLateMaterialization(ParquetColumnReader* column_reader) {
+  return column_reader->IsCollectionReader() || column_reader->max_rep_level() > 0;
+}
+
+void HdfsParquetScanner::DivideFilterAndNonFilterColumnReaders(
+    const vector<ParquetColumnReader*>& column_readers,
+    vector<ParquetColumnReader*>* filter_readers,
+    vector<ParquetColumnReader*>* non_filter_readers) const {
+  filter_readers->clear();
+  non_filter_readers->clear();
+  for (auto column_reader : column_readers) {
+    auto slot_desc = column_reader->slot_desc();
+    if (DoesNotSupportLateMaterialization(column_reader) || (slot_desc != nullptr &&
+        std::find(conjunct_slot_ids_.begin(), conjunct_slot_ids_.end(), slot_desc->id())
+            != conjunct_slot_ids_.end())) {
+      filter_readers->push_back(column_reader);
+    } else {
+      non_filter_readers->push_back(column_reader);
+    }
+  }
+}
+
+void HdfsParquetScanner::InitSlotIdsForConjuncts() {
+  conjunct_slot_ids_.reserve(scan_node_->conjuncts().size() +
+    scan_node_->filter_exprs().size());
+  vector<ScalarExpr*> conjuncts;
+  conjuncts.reserve(scan_node_->conjuncts().size() +
+    scan_node_->filter_exprs().size());
+  conjuncts.insert(std::end(conjuncts), std::begin(scan_node_->conjuncts()),
+      std::end(scan_node_->conjuncts()));
+  conjuncts.insert(std::end(conjuncts), std::begin(scan_node_->filter_exprs()),
+      std::end(scan_node_->filter_exprs()));
+  for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
+    conjuncts[conjunct_idx]->GetSlotIds(&conjunct_slot_ids_);
+  }
+}
+
 void HdfsParquetScanner::Close(RowBatch* row_batch) {
   DCHECK(!is_closed_);
   if (row_batch != nullptr) {
@@ -484,7 +532,12 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
     return Status::OK();
   }
   assemble_rows_timer_.Start();
-  Status status = AssembleRows(column_readers_, row_batch, &advance_row_group_);
+  Status status;
+  if (filter_pages_) {
+    status = AssembleRows<true>(row_batch, &advance_row_group_);
+  } else {
+    status = AssembleRows<false>(row_batch, &advance_row_group_);
+  }
   assemble_rows_timer_.Stop();
   RETURN_IF_ERROR(status);
   if (!parse_status_.ok()) {
@@ -2093,7 +2146,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup&
 /// TODO: Since the scratch batch is populated in a column-wise fashion, it is
 /// difficult to maintain a maximum memory footprint without throwing away at least
 /// some work. This point needs further experimentation and thought.
-Status HdfsParquetScanner::AssembleRows(
+Status HdfsParquetScanner::AssembleRowsWithoutLateMaterialization(
     const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
     bool* skip_row_group) {
   DCHECK(!column_readers.empty());
@@ -2131,7 +2184,7 @@ Status HdfsParquetScanner::AssembleRows(
         *skip_row_group = true;
         if (num_tuples_mismatch && continue_execution) {
           Status err(Substitute("Corrupt Parquet file '$0': column '$1' "
-              "had $2 remaining values but expected $3", filename(),
+              "had $2 remaining values but expected $3",filename(),
               col_reader->schema_element().name, last_num_tuples,
               scratch_batch_->num_tuples));
           parse_status_.MergeStatus(err);
@@ -2154,6 +2207,214 @@ Status HdfsParquetScanner::AssembleRows(
   return Status::OK();
 }
 
+/// High-level steps of this function:
+/// 1. If late materialization is disabled or not applicable, use
+///    'AssembleRowsWithoutLateMaterialization'.
+/// 2. Allocate 'scratch' memory for tuples able to hold a full batch.
+/// 3. Populate the slots of all scratch tuples one column reader at a time only
+///    for 'filter_readers_', using the ColumnReader::Read*ValueBatch() functions.
+///    These column readers are based on columns used in conjuncts and runtime filters.
+/// 3. Evaluate runtime filters and conjuncts against the scratch tuples and save
+///    pointer of surviving tuples in the output batch. Note that it just saves the
+///    pointer to surviving tuples which are still part of 'scratch' memory.
+/// 4. If no tuples survive step 3, skip materializing tuples for 'non_filter_readers'.
+///    Collect the rows to be skipped and skip them later at step 5 b.
+/// 5. If surviving tuples are present then:
+///    a. Get the micro batches of surviving rows to be read by 'non_filter_readers_'.
+///    b. Skip rows collected at step 4, if needed.
+///    c. Fill 'scratch' memory by reading micro batches using 'FillScratchMicroBatches'.
+///       Only the rows in micro batches are filled in 'scratch' and other rows are
+///       ignored. Note, we don't need to filter the rows again as output batch already
+///       has pointer to surviving tuples.
+/// 6. Transfer the ownership of scratch memory to the output batch once the scratch
+///    memory is exhausted.
+/// 7. Repeat steps above until we are done with the row group or an error
+///    occurred.
+/// TODO: Since the scratch batch is populated in a column-wise fashion, it is
+/// difficult to maintain a maximum memory footprint without throwing away at least
+/// some work. This point needs further experimentation and thought.
+template <bool USES_PAGE_INDEX>
+Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_group) {
+  DCHECK(!column_readers_.empty());
+  DCHECK(row_batch != nullptr);
+  DCHECK_EQ(*skip_row_group, false);
+  DCHECK(scratch_batch_ != nullptr);
+
+  if (filter_readers_.empty() || non_filter_readers_.empty() ||
+      late_materialization_threshold_ < 0 || filter_readers_[0]->max_rep_level() > 0) {
+    // Late Materialization is either disabled or not applicable for assembling rows here.
+    return AssembleRowsWithoutLateMaterialization(column_readers_, row_batch,
+        skip_row_group);
+  }
+
+  int64_t num_rows_read = 0;
+  int64_t num_rows_to_skip = 0;
+  int64_t last_row_id_processed = -1;
+  while (!column_readers_[0]->RowGroupAtEnd()) {
+    // Start a new scratch batch.
+    RETURN_IF_ERROR(scratch_batch_->Reset(state_));
+    InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
+    // Late Materialization
+    // 1. Filter rows only materializing the columns in 'filter_readers_'
+    // 2. Transfer the surviving rows
+    // 3. Materialize rest of the columns only for surviving rows.
+
+    RETURN_IF_ERROR(FillScratchMicroBatches(filter_readers_, row_batch,
+        skip_row_group, &complete_micro_batch_, 1, scratch_batch_->capacity,
+        &scratch_batch_->num_tuples));
+    if (*skip_row_group) { return Status::OK(); }
+    num_rows_read += scratch_batch_->num_tuples;
+    bool row_group_end = filter_readers_[0]->RowGroupAtEnd();
+    int num_row_to_commit = FilterScratchBatch(row_batch);
+    if (num_row_to_commit == 0) {
+      // Collect the rows to skip, so that we can skip them together to avoid
+      // decompression and decoding. This ensures compressed pages that don't
+      // have any rows of interest are skiped without decompression.
+      num_rows_to_skip += scratch_batch_->num_tuples;
+      last_row_id_processed = filter_readers_[0]->LastProcessedRow();
+    } else {
+      if (num_rows_to_skip > 0) {
+        // skip reading for rest of the non-filter column readers now.
+        RETURN_IF_ERROR(SkipRowsForColumns(
+            non_filter_readers_, &num_rows_to_skip, &last_row_id_processed));
+      }
+      int num_tuples;
+      if (USES_PAGE_INDEX || !scratch_batch_->AtEnd() ||
+          num_row_to_commit == scratch_batch_->num_tuples) {
+        // When using Page Index, materialize the entire batch. Currently, only avoiding
+        // materializing only at the granularity of entire batch is supported for page
+        // indexes. Other condition is when no filtering happened.
+        RETURN_IF_ERROR(FillScratchMicroBatches(non_filter_readers_, row_batch,
+            skip_row_group, &complete_micro_batch_,
+            1 /*'complete_micro_batch' is a single batch */, scratch_batch_->capacity,
+            &num_tuples));
+      } else {
+        ScratchMicroBatch micro_batches[scratch_batch_->capacity];
+        int num_micro_batches = scratch_batch_->GetMicroBatches(
+            late_materialization_threshold_, micro_batches);
+        RETURN_IF_ERROR(FillScratchMicroBatches(non_filter_readers_, row_batch,
+            skip_row_group, micro_batches, num_micro_batches,
+            scratch_batch_->num_tuples, &num_tuples));
+      }
+      if (*skip_row_group) { return Status::OK(); }
+    }
+    // Finalize the Transfer
+    if (scratch_batch_->tuple_byte_size != 0) {
+      scratch_batch_->FinalizeTupleTransfer(row_batch, num_row_to_commit);
+    }
+    if (row_group_end) {
+      // skip reading for rest of the non-filter column readers now.
+      RETURN_IF_ERROR(SkipRowsForColumns(
+          non_filter_readers_, &num_rows_to_skip, &last_row_id_processed));
+      for (int c = 0; c < non_filter_readers_.size(); ++c) {
+        ParquetColumnReader* col_reader = non_filter_readers_[c];
+        if (UNLIKELY(!col_reader->SetRowGroupAtEnd())) {
+          return Status(
+              Substitute("Could not move to RowGroup end in file $0.", filename()));
+        }
+      }
+    }
+    if (num_row_to_commit != 0) {
+      RETURN_IF_ERROR(CheckPageFiltering());
+      RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
+    }
+    if (row_batch->AtCapacity()) {
+      // skip reading for rest of the non-filter column readers now.
+      RETURN_IF_ERROR(SkipRowsForColumns(
+          non_filter_readers_, &num_rows_to_skip, &last_row_id_processed));
+      break;
+    }
+  }
+  row_group_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  // Merge Scanner-local counter into HdfsScanNode counter and reset.
+  COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
+  coll_items_read_counter_ = 0;
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::SkipRowsForColumns(
+    const vector<ParquetColumnReader*>& column_readers, int64_t* num_rows_to_skip,
+    int64_t* skip_to_row) {
+  if (*num_rows_to_skip > 0) {
+    for (int c = 0; c < column_readers.size(); ++c) {
+      ParquetColumnReader* col_reader = column_readers[c];
+      // Skipping may fail for corrupted Parquet file due to mismatch of rows
+      // among columns.
+      if (UNLIKELY(!col_reader->SkipRows(*num_rows_to_skip, *skip_to_row))) {
+        return Status(Substitute("Error in skipping rows in file $0.", filename()));
+      }
+    }
+    *num_rows_to_skip = 0;
+    *skip_to_row = -1;
+  }
+  return Status::OK();
+}
+
+Status HdfsParquetScanner::FillScratchMicroBatches(
+    const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
+    bool* skip_row_group, const ScratchMicroBatch* micro_batches, int num_micro_batches,
+    int max_num_tuples, int* num_tuples) {
+  if (UNLIKELY(num_micro_batches < 1)) {
+    return Status(Substitute("Number of batches is $0, less than 1", num_micro_batches));
+  }
+
+  // Materialize the top-level slots into the scratch batches column-by-column.
+  int last_num_tuples[num_micro_batches];
+  for (int c = 0; c < column_readers.size(); ++c) {
+    ParquetColumnReader* col_reader = column_readers[c];
+    bool continue_execution = false;
+    int last = -1;
+    for (int r = 0; r < num_micro_batches; r++) {
+      if (r == 0) {
+        if (micro_batches[0].start > 0) {
+          if (UNLIKELY(!col_reader->SkipRows(micro_batches[0].start, -1))) {
+            return Status(Substitute("Couldn't skip rows in file $0.", filename()));
+          }
+        }
+      } else {
+        if (UNLIKELY(!col_reader->SkipRows(micro_batches[r].start - last - 1, -1))) {
+          return Status(Substitute("Couldn't skip rows in file $0.", filename()));
+        }
+      }
+      uint8_t* next_tuple_mem = scratch_batch_->tuple_mem
+          + (scratch_batch_->tuple_byte_size * micro_batches[r].start);
+      if (col_reader->max_rep_level() > 0) {
+        continue_execution = col_reader->ReadValueBatch(&scratch_batch_->aux_mem_pool,
+            micro_batches[r].length, tuple_byte_size_, next_tuple_mem, num_tuples);
+      } else {
+        continue_execution =
+            col_reader->ReadNonRepeatedValueBatch(&scratch_batch_->aux_mem_pool,
+                micro_batches[r].length, tuple_byte_size_, next_tuple_mem, num_tuples);
+      }
+      last = micro_batches[r].end;
+      // Check that all column readers populated the same number of values.
+      bool num_tuples_mismatch = c != 0 && last_num_tuples[r] != *num_tuples;
+      if (UNLIKELY(!continue_execution || num_tuples_mismatch)) {
+        // Skipping this row group. Free up all the resources with this row group.
+        FlushRowGroupResources(row_batch);
+        *num_tuples = 0;
+        DCHECK(scratch_batch_->AtEnd());
+        *skip_row_group = true;
+        if (num_tuples_mismatch && continue_execution) {
+          Status err(Substitute("Corrupt Parquet file '$0': column '$1' "
+              "had $2 remaining values but expected $3",filename(),
+              col_reader->schema_element().name, last_num_tuples[r], *num_tuples));
+          parse_status_.MergeStatus(err);
+        }
+        return Status::OK();
+      }
+      last_num_tuples[r] = *num_tuples;
+    }
+    if (UNLIKELY(last < max_num_tuples - 1)) {
+      if (UNLIKELY(!col_reader->SkipRows(max_num_tuples - 1 - last, -1))) {
+        return Status(Substitute("Couldn't skip rows in file $0.", filename()));
+      }
+    }
+  }
+  return Status::OK();
+}
+
 Status HdfsParquetScanner::CheckPageFiltering() {
   if (candidate_ranges_.empty() || scalar_readers_.empty()) return Status::OK();
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index cab7d60..bf243a4 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -24,6 +24,7 @@
 #include "exec/parquet/parquet-common.h"
 #include "exec/parquet/parquet-metadata-utils.h"
 #include "exec/parquet/parquet-page-index.h"
+#include "exec/scratch-tuple-batch.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/runtime-profile-counters.h"
 
@@ -427,6 +428,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
 
   /// Column reader for each top-level materialized slot in the output tuple.
   std::vector<ParquetColumnReader*> column_readers_;
+  /// Column readers among 'column_readers_' used for filtering
+  std::vector<ParquetColumnReader*> filter_readers_;
+  /// Column readers among 'column_readers_' not used for filtering
+  std::vector<ParquetColumnReader*> non_filter_readers_;
 
   /// File metadata thrift object
   parquet::FileMetaData file_metadata_;
@@ -474,6 +479,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Mapping from Parquet column indexes to scalar readers.
   std::unordered_map<int, BaseScalarColumnReader*> scalar_reader_map_;
 
+  /// List of slot ids used by conjuncts and runtime filters.
+  std::vector<SlotId> conjunct_slot_ids_;
+
   /// Memory used to store the tuples used for dictionary filtering. Tuples owned by
   /// perm_pool_.
   std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
@@ -521,6 +529,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// 'num_pages_counter_ - num_stats_filtered_pages_counter_' pages.
   RuntimeProfile::Counter* num_pages_counter_;
 
+  /// Number of pages skipped by late materialization as they did not have any
+  /// rows that survived filtering.
+  RuntimeProfile::Counter* num_pages_skipped_by_late_materialization_counter_;
+
   /// Number of scanners that end up doing no reads because their splits don't overlap
   /// with the midpoint of any row-group in the file.
   RuntimeProfile::Counter* num_scanners_with_no_reads_counter_;
@@ -551,6 +563,20 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
 
   ParquetPageIndex page_index_;
 
+  /// Defines late materialization threshold for column readers. While materializing
+  /// values and putting them into scratch_batch_, we try to avoid materializing values
+  /// that were filtered out already. Threshold defines minimum number of contiguous
+  /// rows that have to be filtered out to skip materializing them.
+  /// For example, if the threshold is 5 and rows surviving filters are 1-20 and 27-35,
+  /// then we can skip materializing 6 rows (21-26) as threshold 5 is less than 6. But
+  /// if threshold is 10, then rows 21-26 will be materialized instead.
+  int32_t late_materialization_threshold_;
+
+  /// In late Materializing, we try to materialize only the portition of a batch that
+  /// survive after filtering and call it micro batch. This represents a micro batch
+  /// that spans entire batch of length 'scratch_batch_->capacity'.
+  ScratchMicroBatch complete_micro_batch_;
+
   const char* filename() const { return metadata_range_->file(); }
 
   virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
@@ -724,8 +750,14 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// of this query should be terminated immediately.
   /// May set *skip_row_group to indicate that the current row group should be skipped,
   /// e.g., due to a parse error, but execution should continue.
-  Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
-      RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT;
+  template <bool USE_PAGE_INDEX>
+  Status AssembleRows(RowBatch* row_batch, bool* skip_row_group) WARN_UNUSED_RESULT;
+
+  /// Check 'AssembleRows' for details.
+  /// 'AssembleRows' implements late materialization whereas this function does not.
+  Status AssembleRowsWithoutLateMaterialization(
+      const vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch,
+      bool* skip_row_group) WARN_UNUSED_RESULT;
 
   /// Commit num_rows to the given row batch.
   /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
@@ -906,6 +938,30 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// Updates the counter parquet_uncompressed_page_size_counter_ with the given
   /// uncompressed page size. Called by ParquetColumnReader for each page read.
   void UpdateUncompressedPageSizeCounter(int64_t uncompressed_page_size);
+
+  /// Initialize 'conjunct_slot_ids_' with the SlotIds used in the conjuncts.
+  void InitSlotIdsForConjuncts();
+
+  /// Fill 'micro_batches' with the data read by 'column_readers'.
+  /// Micro batches are sub ranges in 0..num_tuples-1 which needs to be read.
+  /// Tuple memory to write to is specified by 'scratch_batch->tuple_mem'.
+  Status FillScratchMicroBatches(const vector<ParquetColumnReader*>& column_readers,
+      RowBatch* row_batch, bool* skip_row_group, const ScratchMicroBatch* micro_batches,
+      int num_micro_batches, int max_num_tuples, int* num_tuples);
+
+  /// Partition 'column_readers' into filter and non-filter readers. All 'filter_readers'
+  /// are the readers reading columns involved in either static filter or runtime filter.
+  /// All 'non_filter_readers' are responsible for reading surviving rows from those
+  /// columns that are not involved in filtering.
+  void DivideFilterAndNonFilterColumnReaders(
+      const vector<ParquetColumnReader*>& column_readers,
+      vector<ParquetColumnReader*>* filter_readers,
+      vector<ParquetColumnReader*>* non_filter_readers) const;
+
+  /// Skip 'num_rows_to_skip' for all 'column_readers'. If Page filtering is enabled
+  /// then we skip to row index 'skip_to_row'.
+  Status SkipRowsForColumns(const vector<ParquetColumnReader*>& column_readers,
+      int64_t* num_rows_to_skip, int64_t* skip_to_row);
 };
 
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc b/be/src/exec/parquet/parquet-collection-column-reader.cc
index f35800d..f6b4cde 100644
--- a/be/src/exec/parquet/parquet-collection-column-reader.cc
+++ b/be/src/exec/parquet/parquet-collection-column-reader.cc
@@ -157,4 +157,13 @@ void CollectionColumnReader::UpdateDerivedState() {
     pos_current_value_ = 0;
   }
 }
+
+bool CollectionColumnReader::SkipRows(int64_t num_rows, int64_t skip_row_id) {
+  DCHECK(!children_.empty());
+  for (int c = 0; c < children_.size(); ++c) {
+    if (!children_[c]->SkipRows(num_rows, skip_row_id)) return false;
+  }
+  UpdateDerivedState();
+  return true;
+}
 } // namespace impala
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.h b/be/src/exec/parquet/parquet-collection-column-reader.h
index ae23979..c8f7ba2 100644
--- a/be/src/exec/parquet/parquet-collection-column-reader.h
+++ b/be/src/exec/parquet/parquet-collection-column-reader.h
@@ -75,6 +75,25 @@ class CollectionColumnReader : public ParquetColumnReader {
 
   virtual void Close(RowBatch* row_batch) override;
 
+  /// Skips the number of encoded values specified by 'num_rows', without materilizing or
+  /// decoding them.
+  /// Returns true on success, false otherwise.
+  virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) override;
+
+  virtual bool SetRowGroupAtEnd() override {
+    DCHECK(!children_.empty());
+    for (int c = 0; c < children_.size(); ++c) {
+      if (!children_[c]->SetRowGroupAtEnd()) return false;
+    }
+    return true;
+  }
+
+  /// Returns the index of the row that was processed most recently.
+  int64_t LastProcessedRow() const override {
+    DCHECK(!children_.empty());
+    return children_[0]->LastProcessedRow();
+  }
+
  private:
   /// Column readers of fields contained within this collection. There is at least one
   /// child reader per collection reader. Child readers either materialize slots in the
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc b/be/src/exec/parquet/parquet-column-chunk-reader.cc
index aa4bacf..29c2cfb 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.cc
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -203,8 +203,8 @@ Status ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b
   return Status::OK();
 }
 
-Status ParquetColumnChunkReader::ReadNextDataPage(bool* eos, uint8_t** data,
-    int* data_size) {
+Status ParquetColumnChunkReader::ReadNextDataPage(
+    bool* eos, uint8_t** data, int* data_size, bool read_data) {
   // Read the next data page, skipping page types we don't care about. This method should
   // be called after we know that the first page is not a dictionary page. Therefore, if
   // we find a dictionary page, it is an error in the parquet file and we return a non-ok
@@ -227,8 +227,11 @@ Status ParquetColumnChunkReader::ReadNextDataPage(bool* eos, uint8_t** data,
       RETURN_IF_ERROR(SkipPageData());
     }
   }
-
-  return ReadDataPageData(data, data_size);
+  if (read_data) {
+    return ReadDataPageData(data, data_size);
+  } else {
+    return Status::OK();
+  }
 }
 
 Status ParquetColumnChunkReader::ReadDataPageData(uint8_t** data, int* data_size) {
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.h b/be/src/exec/parquet/parquet-column-chunk-reader.h
index 3eb0693..dcef832 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.h
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.h
@@ -113,19 +113,32 @@ class ParquetColumnChunkReader {
       ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
       int64_t* data_size, int* num_entries);
 
-  /// Reads the next data page to '*data' and '*data_size'.
+  /// Reads the next data page to '*data' and '*data_size', if 'read_data' is true.
+  /// Else reads page header only, following which client should either call
+  /// 'ReadDataPageData' or 'SkipPageData'.
   /// Skips other types of pages (except for dictionary) until it finds a data page. If it
   /// finds a dictionary page, returns an error as the dictionary page should be the first
   /// page and this method should only be called if a data page is expected.
   /// If the stream reaches the end before reading a complete page header, '*eos' is set
   /// to true.
-  Status ReadNextDataPage(bool* eos, uint8_t** data, int* data_size);
+  Status ReadNextDataPage(
+      bool* eos, uint8_t** data, int* data_size, bool read_data = true);
 
   /// If the column type is a variable length string, transfers the remaining resources
   /// backing tuples to 'mem_pool' and frees up other resources. Otherwise frees all
   /// resources.
   void ReleaseResourcesOfLastPage(MemPool& mem_pool);
 
+  /// Skips the data part of the page. The header must be already read.
+  Status SkipPageData();
+
+  /// Reads the data part of the next data page. Sets '*data' to point to the buffer and
+  /// '*data_size' to its size.
+  /// If the column type is a variable length string, the buffer is allocated from
+  /// data_page_pool_. Otherwise the returned buffer will be valid only until the next
+  /// function call that advances the buffer.
+  Status ReadDataPageData(uint8_t** data, int* data_size);
+
  private:
   HdfsParquetScanner* parent_;
   std::string schema_name_;
@@ -151,16 +164,6 @@ class ParquetColumnChunkReader {
   Status ReadDictionaryData(ScopedBuffer* uncompressed_buffer, uint8_t** dict_values,
       int64_t* data_size, int* num_entries);
 
-  /// Reads the data part of the next data page. Sets '*data' to point to the buffer and
-  /// '*data_size' to its size.
-  /// If the column type is a variable length string, the buffer is allocated from
-  /// data_page_pool_. Otherwise the returned buffer will be valid only until the next
-  /// function call that advances the buffer.
-  Status ReadDataPageData(uint8_t** data, int* data_size);
-
-  /// Skips the data part of the page. The header must be already read.
-  Status SkipPageData();
-
   /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
   /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success,
   /// 'buffer' points to the allocated memory. Otherwise an error status is returned.
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index 86131ff..3a8ac36 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -1134,6 +1134,65 @@ Status BaseScalarColumnReader::ReadDataPage() {
   return Status::OK();
 }
 
+Status BaseScalarColumnReader::ReadNextDataPageHeader() {
+  // We're about to move to the next data page. The previous data page is
+  // now complete, free up any memory allocated for it. If the data page contained
+  // strings we need to attach it to the returned batch.
+  col_chunk_reader_.ReleaseResourcesOfLastPage(parent_->scratch_batch_->aux_mem_pool);
+
+  DCHECK_EQ(num_buffered_values_, 0);
+  if ((DoesPageFiltering() && candidate_page_idx_ == candidate_data_pages_.size() - 1)
+      || num_values_read_ == metadata_->num_values) {
+    // No more pages to read
+    // TODO: should we check for stream_->eosr()?
+    return Status::OK();
+  } else if (num_values_read_ > metadata_->num_values) {
+    RETURN_IF_ERROR(LogCorruptNumValuesInMetadataError());
+    return Status::OK();
+  }
+
+  bool eos;
+  int data_size;
+  RETURN_IF_ERROR(col_chunk_reader_.ReadNextDataPage(&eos, &data_, &data_size,
+      false /*Read next data page's header only*/));
+  if (eos) return HandleTooEarlyEos();
+  const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
+  int num_values = current_page_header.data_page_header.num_values;
+  if (UNLIKELY(num_values < 0)) {
+    return Status(Substitute("Error reading data page in Parquet file '$0'. "
+                             "Invalid number of values in metadata: $1",
+        filename(), num_values));
+  }
+  num_buffered_values_ = num_values;
+  num_values_read_ += num_buffered_values_;
+  if (parent_->candidate_ranges_.empty()) COUNTER_ADD(parent_->num_pages_counter_, 1);
+  return Status::OK();
+}
+
+Status BaseScalarColumnReader::ReadCurrentDataPage() {
+  int data_size;
+  RETURN_IF_ERROR(col_chunk_reader_.ReadDataPageData(&data_, &data_size));
+  data_end_ = data_ + data_size;
+  const parquet::PageHeader& current_page_header = col_chunk_reader_.CurrentPageHeader();
+  /// TODO: Move the level decoder initialisation to ParquetPageReader to abstract away
+  /// the differences between Parquet header V1 and V2.
+  // Initialize the repetition level data
+  RETURN_IF_ERROR(rep_levels_.Init(filename(),
+      &current_page_header.data_page_header.repetition_level_encoding,
+      parent_->perm_pool_.get(), parent_->state_->batch_size(), max_rep_level(), &data_,
+      &data_size));
+  // Initialize the definition level data
+  RETURN_IF_ERROR(def_levels_.Init(filename(),
+      &current_page_header.data_page_header.definition_level_encoding,
+      parent_->perm_pool_.get(), parent_->state_->batch_size(), max_def_level(), &data_,
+      &data_size));
+  // Data can be empty if the column contains all NULLs
+  RETURN_IF_ERROR(InitDataPage(data_, data_size));
+  // Skip rows if needed.
+  RETURN_IF_ERROR(StartPageFiltering());
+  return Status::OK();
+}
+
 template <bool ADVANCE_REP_LEVEL>
 bool BaseScalarColumnReader::NextLevels() {
   if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
@@ -1149,7 +1208,9 @@ bool BaseScalarColumnReader::NextLevels() {
         auto current_range = parent_->candidate_ranges_[current_row_range_];
         int64_t skip_rows = current_range.first - current_row_ - 1;
         DCHECK_GE(skip_rows, 0);
-        if (!SkipTopLevelRows(skip_rows)) return false;
+        int64_t remaining = 0;
+        if (!SkipTopLevelRows(skip_rows, &remaining)) return false;
+        DCHECK_EQ(remaining, 0);
       } else {
         if (!JumpToNextPage()) return parent_->parse_status_.ok();
       }
@@ -1207,30 +1268,43 @@ Status BaseScalarColumnReader::StartPageFiltering() {
   int64_t range_start = candidate_row_ranges[current_row_range_].first;
   if (range_start > current_row_ + 1) {
     int64_t skip_rows = range_start - current_row_ - 1;
-    if (!SkipTopLevelRows(skip_rows)) {
+    int64_t remaining = 0;
+    if (!SkipTopLevelRows(skip_rows, &remaining)) {
       return Status(Substitute("Couldn't skip rows in file $0.", filename()));
     }
+    DCHECK_EQ(remaining, 0);
     DCHECK_EQ(current_row_, range_start - 1);
   }
   return Status::OK();
 }
 
-bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows) {
-  DCHECK_GE(num_buffered_values_, num_rows);
+template <bool MULTI_PAGE>
+bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows, int64_t* remaining) {
   DCHECK_GT(num_rows, 0);
+  DCHECK_GT(num_buffered_values_, 0);
+  if (!MULTI_PAGE) {
+    DCHECK_GE(num_buffered_values_, num_rows);
+  }
   // Fastest path: field is required and not nested.
   // So row count equals value count, and every value is stored in the page data.
   if (max_def_level() == 0 && max_rep_level() == 0) {
-    current_row_ += num_rows;
-    num_buffered_values_ -= num_rows;
-    return SkipEncodedValuesInPage(num_rows);
+    int rows_skipped;
+    if (MULTI_PAGE) {
+      rows_skipped = std::min((int64_t)num_buffered_values_, num_rows);
+    } else {
+      rows_skipped = num_rows;
+    }
+    current_row_ += rows_skipped;
+    num_buffered_values_ -= rows_skipped;
+    *remaining = num_rows - rows_skipped;
+    return SkipEncodedValuesInPage(rows_skipped);
   }
   int64_t num_values_to_skip = 0;
   if (max_rep_level() == 0) {
     // No nesting, but field is not required.
     // Skip as many values in the page data as many non-NULL values encountered.
     int i = 0;
-    while (i < num_rows) {
+    while (i < num_rows && num_buffered_values_ > 0) {
       int repeated_run_length = def_levels_.NextRepeatedRunLength();
       if (repeated_run_length > 0) {
         int read_count = min<int64_t>(num_rows - i, repeated_run_length);
@@ -1249,11 +1323,13 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows) {
         if (!def_levels_.CacheNextBatch(num_buffered_values_).ok()) return false;
       }
     }
-    current_row_ += num_rows;
+    DCHECK_LE(i, num_rows);
+    current_row_ += i;
+    *remaining = num_rows - i;
   } else {
     // 'rep_level_' being zero denotes the start of a new top-level row.
     // From the 'def_level_' we can determine the number of non-NULL values.
-    while (true) {
+    while (num_buffered_values_ > 0) {
       if (!def_levels_.CacheNextBatchIfEmpty(num_buffered_values_).ok()) return false;
       if (!rep_levels_.CacheNextBatchIfEmpty(num_buffered_values_).ok()) return false;
       if (num_rows == 0 && rep_levels_.PeekLevel() == 0) {
@@ -1270,10 +1346,33 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows) {
         --num_rows;
       }
     }
+    *remaining = num_rows;
   }
   return SkipEncodedValuesInPage(num_values_to_skip);
 }
 
+bool BaseScalarColumnReader::SetRowGroupAtEnd() {
+  if (RowGroupAtEnd()) {
+    return true;
+  }
+  if (num_buffered_values_ == 0) {
+    NextPage();
+  }
+  if (DoesPageFiltering() && RowsRemainingInCandidateRange() == 0) {
+    if (max_rep_level() == 0 || rep_levels_.PeekLevel() == 0) {
+      if (!IsLastCandidateRange()) AdvanceCandidateRange();
+      if (!PageHasRemainingCandidateRows()) {
+        JumpToNextPage();
+      }
+    }
+  }
+  bool status = RowGroupAtEnd();
+  if (!status) {
+    return false;
+  }
+  return parent_->parse_status_.ok();
+}
+
 int BaseScalarColumnReader::FillPositionsInCandidateRange(int rows_remaining,
     int max_values, uint8_t* RESTRICT tuple_mem, int tuple_size) {
   DCHECK_GT(max_rep_level_, 0);
@@ -1335,7 +1434,8 @@ bool BaseScalarColumnReader::SkipRowsInPage() {
   DCHECK_LT(current_row_, current_range.first);
   int64_t skip_rows = current_range.first - current_row_ - 1;
   DCHECK_GE(skip_rows, 0);
-  return SkipTopLevelRows(skip_rows);
+  int64_t remaining = 0;
+  return SkipTopLevelRows(skip_rows, &remaining);
 }
 
 bool BaseScalarColumnReader::JumpToNextPage() {
@@ -1377,6 +1477,21 @@ bool BaseScalarColumnReader::NextPage() {
   return true;
 }
 
+bool BaseScalarColumnReader::AdvanceNextPageHeader() {
+  num_buffered_values_ = 0;
+  parent_->assemble_rows_timer_.Stop();
+  parent_->parse_status_ = ReadNextDataPageHeader();
+  if (UNLIKELY(!parent_->parse_status_.ok())) return false;
+  if (num_buffered_values_ == 0) {
+    rep_level_ = ParquetLevel::ROW_GROUP_END;
+    def_level_ = ParquetLevel::ROW_GROUP_END;
+    pos_current_value_ = ParquetLevel::INVALID_POS;
+    return false;
+  }
+  parent_->assemble_rows_timer_.Start();
+  return true;
+}
+
 void BaseScalarColumnReader::SetLevelDecodeError(
     const char* level_name, int decoded_level, int max_level) {
   if (decoded_level < 0) {
@@ -1392,6 +1507,119 @@ void BaseScalarColumnReader::SetLevelDecodeError(
   }
 }
 
+/// Wrapper around 'SkipTopLevelRows' to skip across multiple pages.
+/// Function handles 3 scenarios:
+/// 1. Page Filtering: When this is enabled this function can be used
+///    to skip to a particular 'skip_row_id'.
+/// 2. Collection: When this scalar reader is reading elements of a collection
+/// 3. Rest of the cases.
+/// For page filtering, we keep track of first and last page indexes and keep
+/// traversing to next page until we find a page that contains 'skip_row_id'.
+/// At that point, we can just skip to the required row id.
+/// Difference between scenario 2 and 3 is that in scenario 2, we end up
+/// decompressing all the pages being skipped, whereas in scenario 3 we only
+/// decompress pages required and avoid decompression needed. This is possible
+/// because in scenario 3 'data_page_header.num_values' corresponds to number
+/// of rows stored in the page. This is not true in scenario 2 because multiple
+/// consecutive values can belong to same row.
+template <bool IN_COLLECTION>
+bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row_id) {
+  if (DoesPageFiltering() && skip_row_id > 0) {
+    // Checks if its the beginning of row group and advances to next page.
+    if (candidate_page_idx_ < 0) {
+      if (UNLIKELY(!NextPage())) {
+        return false;
+      }
+    }
+    int last_row_idx = LastRowIdxInCurrentPage();
+    // Keep advancing until we hit reach required page containing 'skip_row_id'
+    while (skip_row_id > last_row_idx) {
+      COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
+      if (UNLIKELY(!JumpToNextPage())) {
+        return false;
+      }
+      last_row_idx = LastRowIdxInCurrentPage();
+    }
+    DCHECK_GE(skip_row_id, FirstRowIdxInCurrentPage());
+    int64_t last_row = LastProcessedRow();
+    int64_t remaining = 0;
+    // Skip to the required row id within the page.
+    if (last_row < skip_row_id) {
+      if (UNLIKELY(!SkipTopLevelRows(skip_row_id - last_row, &remaining))) {
+        return false;
+      }
+    }
+    // also need to adjust 'candidate_row_ranges' as we skipped to new row id.
+    auto& candidate_row_ranges = parent_->candidate_ranges_;
+    while (current_row_ > candidate_row_ranges[current_row_range_].last) {
+      DCHECK_LT(current_row_range_, candidate_row_ranges.size());
+      ++current_row_range_;
+    }
+    return true;
+  } else if (IN_COLLECTION) {
+    DCHECK_GT(num_rows, 0);
+    // if all the values of current page are consumed, move to next page.
+    if (num_buffered_values_ == 0) {
+      if (!NextPage()) {
+        return false;
+      }
+    }
+    DCHECK_GT(num_buffered_values_, 0);
+    int64_t remaining = 0;
+    // Try to skip 'num_rows' and see if something remains.
+    if (!SkipTopLevelRows<true>(num_rows, &remaining)) {
+      return false;
+    }
+    // Again invoke the same method on remaining rows.
+    if (remaining > 0) {
+      return SkipRowsInternal<IN_COLLECTION>(remaining, skip_row_id);
+    }
+    return true;
+  } else {
+    // If everything consumed in current page, skip data pages (multiple skips if needed)
+    // to reach required page.
+    if (num_buffered_values_ == 0) {
+      if (!AdvanceNextPageHeader()) {
+        return false;
+      }
+      const parquet::PageHeader& current_page_header =
+          col_chunk_reader_.CurrentPageHeader();
+      int32_t current_page_values = current_page_header.data_page_header.num_values;
+      if (UNLIKELY(current_page_values <= 0)) {
+        return false;
+      }
+      // Keep advancing to next page header if rows to be skipped are more than number
+      // of values in the page. Note we will just be reading headers and skipping
+      // pages without decompressing them as we advance.
+      while (num_rows > current_page_values) {
+        COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
+        num_rows -= current_page_values;
+        if (!col_chunk_reader_.SkipPageData().ok() || !AdvanceNextPageHeader()) {
+          return false;
+        }
+        current_page_values =
+            col_chunk_reader_.CurrentPageHeader().data_page_header.num_values;
+      }
+      // Read the data page (includes decompressing them if required).
+      Status page_read = ReadCurrentDataPage();
+      if (!page_read.ok()) {
+        return false;
+      }
+    }
+
+    // Skip the remaining rows in the page.
+    DCHECK_GT(num_buffered_values_, 0);
+    int64_t remaining = 0;
+    if (!SkipTopLevelRows<true>(num_rows, &remaining)) {
+      return false;
+    }
+    if (remaining > 0) {
+      return SkipRowsInternal<IN_COLLECTION>(remaining, skip_row_id);
+    }
+    return true;
+  }
+};
+
 /// Returns a column reader for decimal types based on its size and parquet type.
 static ParquetColumnReader* CreateDecimalColumnReader(
     const SchemaNode& node, const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 8260c35..63ab694 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -22,8 +22,8 @@
 
 #include "common/atomic.h"
 #include "exec/parquet/hdfs-parquet-scanner.h"
-#include "exec/parquet/parquet-level-decoder.h"
 #include "exec/parquet/parquet-column-chunk-reader.h"
+#include "exec/parquet/parquet-level-decoder.h"
 
 namespace impala {
 
@@ -163,6 +163,24 @@ class ParquetColumnReader {
   /// and frees up other resources. If 'row_batch' is NULL frees all resources instead.
   virtual void Close(RowBatch* row_batch) = 0;
 
+  /// Skips the number of encoded values specified by 'num_rows', without materilizing or
+  /// decoding them across pages. If page filtering is enabled, then it directly skips to
+  /// row after 'skip_row_id' and ignores 'num_rows'.
+  /// It invokes 'SkipToLevelRows' for all 'children_'.
+  /// Returns true on success, false otherwise.
+  virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) = 0;
+
+  /// Sets to row group end. 'rep_level_' and 'def_level_' is set to
+  /// ParquetLevel::ROW_GROUP_END. ParquetLevel::INVALID_LEVEL
+  virtual bool SetRowGroupAtEnd() = 0;
+
+  /// Returns the last processed row Id, if present.
+  /// else will return -1.
+  virtual int64_t LastProcessedRow() const { return -1; }
+
+  // Returns 'true' if the reader supports page index.
+  virtual bool DoesPageFiltering() const { return false; }
+
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
@@ -250,7 +268,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
   virtual ~BaseScalarColumnReader() { }
 
-  virtual bool IsCollectionReader() const { return false; }
+  virtual bool IsCollectionReader() const override { return false; }
 
   /// Resets the reader for each row group in the file and creates the scan
   /// range for the column, but does not start it. To start scanning,
@@ -272,7 +290,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     return Status::OK();
   }
 
-  virtual void Close(RowBatch* row_batch);
+  virtual void Close(RowBatch* row_batch) override;
 
   io::ScanRange* scan_range() const { return col_chunk_reader_.scan_range(); }
   int64_t total_len() const { return metadata_->total_compressed_size; }
@@ -285,7 +303,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
   /// Reads the next definition and repetition levels for this column. Initializes the
   /// next data page if necessary.
-  virtual bool NextLevels() { return NextLevels<true>(); }
+  virtual bool NextLevels() override { return NextLevels<true>(); }
 
   /// Check the data stream to see if there is a dictionary page. If there is,
   /// use that page to initialize dict_decoder_ and advance the data stream
@@ -407,11 +425,27 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// this function will continue reading the next data page.
   Status ReadDataPage();
 
+  /// Reads just the header of next data page. If a dictionary page is encountered,
+  /// that will be skipped and this function will continue reading header of next data
+  /// page.
+  Status ReadNextDataPageHeader();
+
+  /// Reads only the data page coontent and not the header.
+  /// This should strictly be called only after 'ReadNextDataPageHeader' or
+  /// 'AdvanceNextPageHeader' has been invoked.
+  Status ReadCurrentDataPage();
+
   /// Try to move the the next page and buffer more values. Return false and
   /// sets rep_level_, def_level_ and pos_current_value_ to -1 if no more pages or an
   /// error encountered.
   bool NextPage();
 
+  /// Reads just the header of next data page and is a wrapper over
+  /// 'ReadNextDataPageHeader' with relevant checks. It will also increment
+  /// 'num_buffered_values_' based upon header. Need to invoke 'ReadCurrentDataPage'
+  /// if page has rows of interest to actually buffer the values.
+  bool AdvanceNextPageHeader();
+
   /// Implementation for NextLevels().
   template <bool ADVANCE_REP_LEVEL>
   bool NextLevels();
@@ -458,7 +492,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   Status StartPageFiltering();
 
   /// Returns the index of the row that was processed most recently.
-  int64_t LastProcessedRow() const {
+  int64_t LastProcessedRow() const override {
     if (def_level_ == ParquetLevel::ROW_GROUP_END) return current_row_;
     return levels_readahead_ ? current_row_ - 1 : current_row_;
   }
@@ -466,16 +500,37 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Creates sub-ranges if page filtering is active.
   void CreateSubRanges(std::vector<io::ScanRange::SubRange>* sub_ranges);
 
-  /// Calculates how many encoded values we need to skip in the page data, then
-  /// invokes SkipEncodedValuesInPage(). The number of the encoded values depends on the
-  /// nesting of the data, and also on the number of null values.
+  /// Calculates how many encoded values we need to skip in the page data,
+  /// then invokes SkipEncodedValuesInPage(). The number of the encoded
+  /// values depends on the nesting of the data, and also on the number of null values.
   /// E.g. if 'num_rows' is 10, and every row contains an array of 10 integers, then
   /// we need to skip 100 encoded values in the page data.
   /// And, if 'num_rows' is 10, and every second value is NULL, then we only need to skip
   /// 5 values in the page data since NULL values are not stored there.
   /// The number of primitive values can be calculated from the def and rep levels.
+  /// If number of values to be skipped can span multiple pages, then 'MULTI_PAGE'
+  /// should be 'true', else skipping would fail. When 'MULTI_PAGE' is true, this
+  /// function will skip the rows in current page and number of remaining rows will be
+  /// assigned to output parameter 'remaining'.
   /// Returns true on success, false otherwise.
-  bool SkipTopLevelRows(int64_t num_rows);
+  template <bool MULTI_PAGE = false>
+  bool SkipTopLevelRows(int64_t num_rows, int64_t* remaining);
+
+  /// Wrapper around 'SkipTopLevelRows' to skip across multiple pages.
+  /// It returns false when skipping row is not possible, probably when num_rows
+  /// is more than the rows left in current row group. It can happen even with corrupt
+  /// parquet file where number of values might differ from metadata.
+  virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) override {
+    if (max_rep_level() > 0) {
+      return SkipRowsInternal<true>(num_rows, skip_row_id);
+    } else {
+      return SkipRowsInternal<false>(num_rows, skip_row_id);
+    }
+  }
+
+  /// Sets to row group end. 'rep_level_' and 'def_level_' is set to
+  /// ParquetLevel::ROW_GROUP_END. ParquetLevel::INVALID_LEVEL
+  virtual bool SetRowGroupAtEnd() override;
 
   /// Skip values in the page data. Returns true on success, false otherwise.
   virtual bool SkipEncodedValuesInPage(int64_t num_values) = 0;
@@ -488,6 +543,26 @@ class BaseScalarColumnReader : public ParquetColumnReader {
         candidate_data_pages_[candidate_page_idx_]].first_row_index;
   }
 
+  // Returns the last row index of the current page. It is one less than first row index
+  // of next page. For last page, it is one less than 'num_rows' of row group.
+  int64_t LastRowIdxInCurrentPage() const {
+    DCHECK(!candidate_data_pages_.empty());
+    DCHECK_LE(candidate_page_idx_, candidate_data_pages_.size() - 1) ;
+    if (candidate_page_idx_ == candidate_data_pages_.size() - 1) {
+      parquet::RowGroup& row_group =
+          parent_->file_metadata_.row_groups[parent_->row_group_idx_];
+      return row_group.num_rows - 1;
+    } else {
+      return offset_index_.page_locations[candidate_data_pages_[candidate_page_idx_ + 1]]
+                 .first_row_index
+          - 1;
+    }
+  }
+
+  /// Wrapper around 'SkipTopLevelRows' to skip across multiple pages.
+  template <bool IN_COLLECTION>
+  bool SkipRowsInternal(int64_t num_rows, int64_t skip_row_id);
+
   /// The number of top-level rows until the end of the current candidate range.
   /// For simple columns it returns 0 if we have processed the last row in the current
   /// range. For nested columns, it returns 0 when we are processing values from the last
@@ -498,7 +573,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   }
 
   /// Returns true if we are filtering pages.
-  bool DoesPageFiltering() const {
+  bool DoesPageFiltering() const override {
     return !candidate_data_pages_.empty();
   }
 
diff --git a/be/src/exec/scratch-tuple-batch-test.cc b/be/src/exec/scratch-tuple-batch-test.cc
new file mode 100644
index 0000000..b80f8d0
--- /dev/null
+++ b/be/src/exec/scratch-tuple-batch-test.cc
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <cstdio>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+
+#include "common/init.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/tuple-row.h"
+#include "service/fe-support.h"
+#include "service/frontend.h"
+#include "scratch-tuple-batch.h"
+#include "testutil/desc-tbl-builder.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+namespace impala {
+
+scoped_ptr<Frontend> fe;
+
+class ScratchTupleBatchTest : public testing::Test {
+ public:
+  ScratchTupleBatchTest() {}
+
+  static void VerifyMicroBatches(const boost::scoped_array<bool>& selected_rows,
+    ScratchMicroBatch* micro_batches, int num_batches, int gap, int batch_size) {
+    EXPECT_TRUE(num_batches > 0);
+    // All elements upto first micro batch should be False.
+    for (int idx = 0; idx < micro_batches[0].start; idx++) {
+      EXPECT_FALSE(selected_rows[idx]);
+    }
+    // All elements after last micro batch should be False
+    for (int idx = micro_batches[num_batches - 1].end + 1; idx < batch_size; idx++) {
+      EXPECT_FALSE(selected_rows[idx]);
+    }
+    // Verify every batch
+    for (int i = 0; i < num_batches; i++) {
+      const ScratchMicroBatch& batch = micro_batches[i];
+      EXPECT_TRUE(batch.start <= batch.end);
+      EXPECT_TRUE(batch.length == batch.end - batch.start + 1);
+      EXPECT_TRUE(selected_rows[batch.start]);
+      EXPECT_TRUE(selected_rows[batch.end]);
+      int last_true_idx = batch.start;
+      for (int j = batch.start + 1; j < batch.end; j++) {
+        if (selected_rows[j]) {
+          EXPECT_TRUE(j - last_true_idx + 1 <= gap);
+          last_true_idx = j;
+        }
+      }
+    }
+    // Verify any two consecutive batches i and i+1
+    for (int i = 0; i < num_batches - 1; i++) {
+      const ScratchMicroBatch& batch = micro_batches[i];
+      const ScratchMicroBatch& nbatch = micro_batches[i + 1];
+      EXPECT_TRUE(batch.end < nbatch.start);
+      EXPECT_TRUE(nbatch.start - batch.end >= gap);
+      // Any row in betweeen the two batches should not be selected
+      for (int j = batch.end + 1; j < nbatch.start; j++) {
+        EXPECT_FALSE(selected_rows[j]);
+      }
+    }
+  }
+
+ protected:
+  MemTracker tracker_;
+  ObjectPool pool_;
+  RowDescriptor* desc_;
+
+  virtual void SetUp() {
+    DescriptorTblBuilder builder(fe.get(), &pool_);
+    builder.DeclareTuple() << TYPE_INT;
+    DescriptorTbl* desc_tbl = builder.Build();
+    vector<bool> nullable_tuples(1, false);
+    vector<TTupleId> tuple_id(1, (TTupleId) 0);
+    desc_ = pool_.Add(new RowDescriptor(*desc_tbl, tuple_id, nullable_tuples));
+  }
+};
+
+// This tests checks conversion of 'selected_rows' with interleaved
+// 'true' values to 'ScratchMicroBatch';
+TEST_F(ScratchTupleBatchTest, TestInterleavedMicroBatches) {
+  const int BATCH_SIZE = 1024;
+  scoped_ptr<ScratchTupleBatch> scratch_batch(
+      new ScratchTupleBatch(*desc_, BATCH_SIZE, &tracker_));
+  scratch_batch->num_tuples = BATCH_SIZE;
+  // Interleaving gap
+  vector<int> gaps = {2, 4, 8, 16, 32};
+  for (auto n : gaps) {
+    // Set every nth row as selected.
+    for (int batch_idx = 0; batch_idx < 1024; ++batch_idx) {
+      scratch_batch->selected_rows[batch_idx] = (batch_idx + 1) % n == 0 ? true : false;
+    }
+    ScratchMicroBatch micro_batches[BATCH_SIZE];
+    int num_batches = scratch_batch->GetMicroBatches(10 /*Skip Length*/, micro_batches);
+    ScratchTupleBatchTest::VerifyMicroBatches(
+        scratch_batch->selected_rows, micro_batches, num_batches, 10, BATCH_SIZE);
+  }
+}
+
+// This tests checks conversion of 'selected_rows' with clustered
+// 'true' values to 'ScratchMicroBatch';
+TEST_F(ScratchTupleBatchTest, TestClusteredMicroBatches) {
+  const int BATCH_SIZE = 1024;
+  scoped_ptr<ScratchTupleBatch> scratch_batch(
+      new ScratchTupleBatch(*desc_, BATCH_SIZE, &tracker_));
+  scratch_batch->num_tuples = BATCH_SIZE;
+  // clustered size
+  vector<int> cluster_sizes = {32, 64, 128, 256};
+  for (auto n : cluster_sizes) {
+    int batch_idx = 0;
+    bool selected = false;
+    // Set cluster of 'true' and 'false' values
+    while (batch_idx < 1024) {
+      int last_row = batch_idx + n;
+      while (batch_idx < last_row && batch_idx < 1024) {
+        scratch_batch->selected_rows[batch_idx++] = selected;
+      }
+      selected = !selected;
+    }
+    ScratchMicroBatch micro_batches[BATCH_SIZE];
+    EXPECT_EQ(scratch_batch->GetMicroBatches(
+        10 /*Skip Length*/, micro_batches), 1024/(n * 2));
+    ScratchTupleBatchTest::VerifyMicroBatches(
+        scratch_batch->selected_rows, micro_batches, 1024/(n * 2), 10, BATCH_SIZE);
+  }
+}
+}
+
+TEST_F(ScratchTupleBatchTest, TestRandomGeneratedMicroBatches) {
+  const int BATCH_SIZE = 1024;
+  scoped_ptr<ScratchTupleBatch> scratch_batch(
+      new ScratchTupleBatch(*desc_, BATCH_SIZE, &tracker_));
+  scratch_batch->num_tuples = BATCH_SIZE;
+  // gaps to try
+  vector<int> gaps = {5, 16, 29, 37, 1025};
+  vector<float> selected_ratios = {0.5, 0.75, 0.1, 1.0, 0.44};
+  for (int g = 0; g < sizeof(gaps); g++) {
+    int n = gaps[g];
+    // Set random locations as selected.
+    srand(time(NULL));
+    bool atleast_one_true = false;
+    for (int batch_idx = 0; batch_idx < BATCH_SIZE; ++batch_idx) {
+      scratch_batch->selected_rows[batch_idx] =
+          (rand() % BATCH_SIZE) < (BATCH_SIZE * selected_ratios[g]);
+      if (scratch_batch->selected_rows[batch_idx]) {
+        atleast_one_true = true;
+      }
+    }
+    // Ensure atleast one value is true when invoking 'GetMicroBatches'
+    if (!atleast_one_true) {
+      // Set one of the values randomly as true.
+      scratch_batch->selected_rows[rand() % BATCH_SIZE] = true;
+    }
+    ScratchMicroBatch micro_batches[BATCH_SIZE];
+    int num_batches = scratch_batch->GetMicroBatches(n, micro_batches);
+    ScratchTupleBatchTest::VerifyMicroBatches(
+        scratch_batch->selected_rows, micro_batches, num_batches, n, BATCH_SIZE);
+  }
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  InitFeSupport();
+  fe.reset(new Frontend());
+  return RUN_ALL_TESTS();
+}
diff --git a/be/src/exec/scratch-tuple-batch.h b/be/src/exec/scratch-tuple-batch.h
index 1b79be1..31edd2b 100644
--- a/be/src/exec/scratch-tuple-batch.h
+++ b/be/src/exec/scratch-tuple-batch.h
@@ -18,12 +18,20 @@
 #ifndef IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
 #define IMPALA_EXEC_PARQUET_SCRATCH_TUPLE_BATCH_H
 
+#include <boost/scoped_array.hpp>
 #include "runtime/descriptors.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple-row.h"
 
 namespace impala {
 
+/// Helper struct that represents a micro batch within 'ScratchTupleBatch'.
+struct ScratchMicroBatch {
+  int start;
+  int end;
+  int length;
+};
+
 /// Helper struct that holds a batch of tuples allocated from a mem pool, as well
 /// as state associated with iterating over its tuples and transferring
 /// them to an output batch in TransferScratchTuples().
@@ -60,12 +68,18 @@ struct ScratchTupleBatch {
   // materialising them.
   const int MIN_SELECTIVITY_TO_COMPACT = 16;
 
+  // Stores bool array of size 'capacity' to signify the rows filtered out by
+  // 'ProcessScratchBatchCodegenOrInterpret'. If i'th tuple survives then
+  // 'selected_rows[i]' would be true else false.
+  boost::scoped_array<bool> selected_rows;
+
   ScratchTupleBatch(
       const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
     : capacity(batch_size),
       tuple_byte_size(row_desc.GetRowSize()),
       tuple_mem_pool(mem_tracker),
-      aux_mem_pool(mem_tracker) {
+      aux_mem_pool(mem_tracker),
+      selected_rows(new bool[batch_size]) {
     DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
   }
 
@@ -152,6 +166,49 @@ struct ScratchTupleBatch {
     return true;
   }
 
+  /// Creates micro batches that needs to be scanned.
+  /// Bits set in 'selected_rows' are the rows that needs to be scanned. Consecutive
+  /// bits set are used to create micro batches. Micro batches that differ by less than
+  /// 'skip_length', are merged together. E.g., for micro batches 1-8, 11-20, 35-100
+  /// derived from 'selected_rows' and 'skip_length' as 10, first two micro batches would
+  /// be merged into 1-20 as they differ by 3 (11 - 8) which is less than 10
+  /// ('skip_length'). Precondition for the function is there is at least one micro batch
+  /// present i.e., atleast one of the 'selected_rows' is true.
+  int GetMicroBatches(int skip_length, ScratchMicroBatch* batches) {
+    int batch_idx = 0;
+    int start = -1;
+    int last = -1;
+    DCHECK_GT(num_tuples, 0);
+    for (size_t i = 0; i < num_tuples; ++i) {
+      if (selected_rows[i]) {
+        if (start == -1) {
+          // start the first ever micro batch.
+          start = i;
+          last = i;
+        } else if (i - last < skip_length) {
+          // continue the old micro batch as 'last' is within 'skip_length' of last
+          // micro batch.
+          last = i;
+        } else {
+          // start a new micro batch as 'last' is outside 'skip_length'.
+          batches[batch_idx] = {start, last, last - start + 1};
+          batch_idx++;
+          start = i;
+          last = i;
+        }
+      }
+    }
+    /// ensure atleast one of 'selected_rows' is true.
+    DCHECK(start != -1) << "Atleast one of the 'scratch_batch_->selected_rows'"
+                        << "should be true";
+    /// Add the last micro batch which was being built.
+    /// For instance consider batch of size 10 with all true values:
+    /// TTTTTTTTTT or even FFFFFTTTTT. In both cases we would need below.
+    batches[batch_idx] = {start, last, last - start + 1};
+    batch_idx++;
+    return batch_idx;
+  }
+
   Tuple* GetTuple(int tuple_idx) const {
     return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
   }
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index f7eadce..b403e16 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1118,13 +1118,25 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_enable_kudu_transaction(IsTrue(value));
         break;
       }
-      case TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS:
+      case TImpalaQueryOptions::MINMAX_FILTER_PARTITION_COLUMNS: {
         query_options->__set_minmax_filter_partition_columns(IsTrue(value));
         break;
+      }
       case TImpalaQueryOptions::ENABLE_ASYNC_DDL_EXECUTION: {
         query_options->__set_enable_async_ddl_execution(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::PARQUET_LATE_MATERIALIZATION_THRESHOLD: {
+        StringParser::ParseResult result;
+        const int32_t threshold =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (value == nullptr || result != StringParser::PARSE_SUCCESS || threshold < -1) {
+          return Status(Substitute("Invalid parquet late materialization threshold: "
+              "'$0'. Only integer value -1 and above is allowed.", value));
+        }
+        query_options->__set_parquet_late_materialization_threshold(threshold);
+        break;
+      }
       case TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION: {
         query_options->__set_enable_async_load_data_execution(IsTrue(value));
         break;
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 7e9b270..7b32427 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::ENABLE_ASYNC_LOAD_DATA_EXECUTION+ 1);\
+      TImpalaQueryOptions::PARQUET_LATE_MATERIALIZATION_THRESHOLD+ 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -264,7 +264,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(enable_async_ddl_execution, ENABLE_ASYNC_DDL_EXECUTION,\
       TQueryOptionLevel::ADVANCED) \
   QUERY_OPT_FN(enable_async_load_data_execution, ENABLE_ASYNC_LOAD_DATA_EXECUTION,\
-      TQueryOptionLevel::ADVANCED)
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(parquet_late_materialization_threshold,\
+      PARQUET_LATE_MATERIALIZATION_THRESHOLD, TQueryOptionLevel::ADVANCED)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h
index 68e94f5..d627e86 100644
--- a/be/src/util/tuple-row-compare.h
+++ b/be/src/util/tuple-row-compare.h
@@ -232,7 +232,7 @@ class TupleRowLexicalComparator : public TupleRowComparator {
   int CompareInterpreted(const TupleRow* lhs, const TupleRow* rhs) const override;
 };
 
-/// Compares two TupleRows based on a set of exprs. The fisrt 'num_lexical_keys' exprs
+/// Compares two TupleRows based on a set of exprs. The first 'num_lexical_keys' exprs
 /// are compared lexically, while the remaining exprs are compared in Z-order.
 class TupleRowZOrderComparator : public TupleRowComparator {
  public:
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 334ef19..25ebfd7 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -702,6 +702,10 @@ enum TImpalaQueryOptions {
 
   // Indicates whether to run load data requests in async mode.
   ENABLE_ASYNC_LOAD_DATA_EXECUTION = 137
+
+  // Number of minimum consecutive rows when filtered out, will avoid materialization
+  // of columns in parquet. Set it to -1 to turn off late materialization feature.
+  PARQUET_LATE_MATERIALIZATION_THRESHOLD = 138;
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index 130c9db..80f3927 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -555,6 +555,10 @@ struct TQueryOptions {
 
   // Allow load data exec request to run in a separate thread
   138: optional bool enable_async_load_data_execution = true;
+
+  // Number of minimum consecutive rows when filtered out, will avoid materialization
+  // of columns in parquet. Set it to -1 to turn off late materialization feature.
+  139: optional i32 parquet_late_materialization_threshold = 20;
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
index 5e0c29b..b044a5e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-local-tz-conversion.test
@@ -42,7 +42,7 @@ INT, TIMESTAMP
 4101,2010-02-15 02:21:06.300000000
 4102,NULL
 ---- ERRORS
-Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp2_hive_211/out_of_range_timestamp2_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31. (1 of 2433 similar)
+Parquet file '$NAMENODE/test-warehouse/$DATABASE.db/out_of_range_timestamp2_hive_211/out_of_range_timestamp2_hive_211.parquet' column 'timestamp_col' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31. (1 of 4 similar)
 ====
 ---- QUERY
 set timezone="PST8PDT";
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
new file mode 100644
index 0000000..f2d45c4
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-late-materialization.test
@@ -0,0 +1,45 @@
+# This tests pages skipped by parquet late materialization.
+====
+---- QUERY
+# Test for late materialization on page indexes
+select * from tpch_parquet.lineitem where l_orderkey=3209632;
+---- RUNTIME_PROFILE
+aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
+====
+---- QUERY
+# Test for late materialization on non-page index
+select * from tpch_parquet.lineitem
+    where l_comment like '%unusual courts. blithely final theodolit%';
+---- RUNTIME_PROFILE
+aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
+====
+---- QUERY
+# Test late materialization on runtime filters.
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=5000;
+select * from tpch_parquet.lineitem l
+    join tpch_parquet.orders o on l.l_orderkey = o.o_orderkey
+    where o_orderdate='1992-06-22' and o_totalprice = 153827.26;
+---- RUNTIME_PROFILE
+row_regex: .*1 of 1 Runtime Filter Published.*
+aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
+====
+---- QUERY
+# Test late materialization on min/max runtime filters.
+SET RUNTIME_FILTER_WAIT_TIME_MS=5000;
+SET MINMAX_FILTERING_LEVEL=ROW;
+SET ENABLED_RUNTIME_FILTER_TYPES=MIN_MAX;
+SET MINMAX_FILTER_THRESHOLD=0.5;
+select * from tpch_parquet.lineitem l
+    join tpch_parquet.orders o on l.l_orderkey = o.o_orderkey
+    where o_orderdate='1996-12-01' and o_totalprice >= 250000;
+---- RUNTIME_PROFILE
+row_regex:.* RF00.\[min_max\] -. .\.l_orderkey.*
+aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
+====
+---- QUERY
+# Test that late materialization on nested columns is disabled.
+select * from tpch_nested_parquet.customer where c_mktsegment = 'COMEDY';
+---- RUNTIME_PROFILE
+aggregation(SUM, NumPagesSkippedByLateMaterialization): 0
+====
diff --git a/tests/query_test/test_parquet_late_materialization.py b/tests/query_test/test_parquet_late_materialization.py
new file mode 100644
index 0000000..a1f64d9
--- /dev/null
+++ b/tests/query_test/test_parquet_late_materialization.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestParquetLateMaterialization(ImpalaTestSuite):
+  """
+  This suite tests late materialization optimization for parquet.
+  """
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestParquetLateMaterialization, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_parquet_late_materialization(self, vector):
+    self.run_test_case('QueryTest/parquet-late-materialization', vector)