You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/05/25 03:18:12 UTC

[1/2] incubator-impala git commit: IMPALA-5324: Fix version check in EvalDictionaryFilters

Repository: incubator-impala
Updated Branches:
  refs/heads/master eb54287fb -> b4343895d


IMPALA-5324: Fix version check in EvalDictionaryFilters

Due to a bootstrapping issue with the dictionary
filtering code change, the parquet version check used in
EvalDictionaryFilters was checking for < 2.10. However,
the impala 2.9 parquet contains the appropriate encoding,
so this changes the version check to be < 2.9.

Change-Id: Icc216332171038f74ff1d2ce3066da8167095361
Reviewed-on: http://gerrit.cloudera.org:8080/6969
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a173f70a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a173f70a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a173f70a

Branch: refs/heads/master
Commit: a173f70a0ccacff2de978d171d6bb42349917471
Parents: eb54287
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue May 23 16:00:55 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 25 01:37:25 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a173f70a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 1d5d807..4106943 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -834,11 +834,10 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
     bool* row_group_eliminated) {
   *row_group_eliminated = false;
 
-  // TODO: Bootstrapping problem: existing 2.9 files don't have the encoding
-  // stats or encodings set properly, but after this goes in, they will.
-  // Change to 2.9 later.
+  // Legacy impala files (< 2.9) require special handling, because they do not encode
+  // information about whether the column is 100% dictionary encoded.
   bool is_legacy_impala = false;
-  if (file_version_.application == "impala" && file_version_.VersionLt(2,10,0)) {
+  if (file_version_.application == "impala" && file_version_.VersionLt(2,9,0)) {
     is_legacy_impala = true;
   }
 


[2/2] incubator-impala git commit: IMPALA-4923: reduce memory transfer for selective scans

Posted by ta...@apache.org.
IMPALA-4923: reduce memory transfer for selective scans

Most of the code changes are to restructure things so that the
scratch batch's tuple buffer is stored in a separate MemPool
from auxiliary memory such as decompression buffers. This part
of the change does not change the behaviour of the scanner in
itself, but allows us to recycle the tuple buffer without holding
onto unused auxiliary memory.

The optimisation is implemented in TryCompact(): if enough rows
were filtered out during the copy from the scratch batch to the
output batch, the fixed-length portions of the surviving rows
(if any) are copied to a new, smaller, buffer, and the original,
larger, buffer is reused for the next scratch batch.

Previously the large buffer was always attached to the output batch,
so a large buffer was transferred between threads for every scratch
batch processed. In combination with the decompression buffer change
in IMPALA-5304, this means that in many cases selective scans don't
produce nearly as many empty or near-empty batches and do not attach
nearly as much memory to each batch.

Performance:
Even on an 8 core machine I see some speedup on selective scans.
Profiling with "perf top" also showed that time in TCMalloc
was reduced - it went from several % of CPU time to a minimal
amount.

Running TPC-H on the same machine showed a ~5% overall improvement
and no regressions. E.g. Q6 got 20-25% faster.

I hope to do some additional cluster benchmarking on systems
with more cores to verify that the severe performance problems
there are fixed, but in the meantime it seems like we have enough
evidence that it will at least improve things.

Testing:
Add a couple of selective scans that exercise the new code paths.

Change-Id: I3773dc63c498e295a2c1386a15c5e69205e747ea
Reviewed-on: http://gerrit.cloudera.org:8080/6949
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b4343895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b4343895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b4343895

Branch: refs/heads/master
Commit: b4343895d8906d66e5777edab080aacc0eeb3717
Parents: a173f70
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sat May 20 07:41:13 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 25 02:55:36 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc             |  45 +++---
 be/src/exec/parquet-column-readers.cc           |   2 +-
 be/src/exec/parquet-scratch-tuple-batch.h       | 137 ++++++++++++++++---
 be/src/runtime/row-batch.cc                     |  21 +--
 be/src/runtime/row-batch.h                      |   9 +-
 .../queries/QueryTest/parquet.test              |  58 ++++++++
 6 files changed, 212 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 4106943..0119efd 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -279,10 +279,9 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
     dictionary_pool_.get()->FreeAll();
     context_->ReleaseCompletedResources(nullptr, true);
     for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr);
-    // The scratch batch may still contain tuple data (or tuple ptrs if the legacy joins
-    // or aggs are enabled). We can get into this case if Open() fails or if the query is
-    // cancelled.
-    scratch_batch_->mem_pool()->FreeAll();
+    // The scratch batch may still contain tuple data. We can get into this case if
+    // Open() fails or if the query is cancelled.
+    scratch_batch_->ReleaseResources(nullptr);
   }
   if (level_cache_pool_ != nullptr) {
     level_cache_pool_->FreeAll();
@@ -292,7 +291,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
   // Verify all resources (if any) have been transferred.
   DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
   DCHECK_EQ(dictionary_pool_->total_allocated_bytes(), 0);
-  DCHECK_EQ(scratch_batch_->mem_pool()->total_allocated_bytes(), 0);
+  DCHECK_EQ(scratch_batch_->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
 
   // Collect compression types for reporting completed ranges.
@@ -704,9 +703,9 @@ Status HdfsParquetScanner::NextRowGroup() {
 void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
   DCHECK(row_batch != NULL);
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
-  row_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false);
+  scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
   context_->ReleaseCompletedResources(row_batch, true);
-  for (ParquetColumnReader* col_reader: column_readers_) {
+  for (ParquetColumnReader* col_reader : column_readers_) {
     col_reader->Close(row_batch);
   }
 }
@@ -931,7 +930,7 @@ Status HdfsParquetScanner::AssembleRows(
   while (!column_readers[0]->RowGroupAtEnd()) {
     // Start a new scratch batch.
     RETURN_IF_ERROR(scratch_batch_->Reset(state_));
-    int scratch_capacity = scratch_batch_->capacity();
+    int scratch_capacity = scratch_batch_->capacity;
 
     // Initialize tuple memory.
     for (int i = 0; i < scratch_capacity; ++i) {
@@ -946,11 +945,11 @@ Status HdfsParquetScanner::AssembleRows(
       ParquetColumnReader* col_reader = column_readers[c];
       if (col_reader->max_rep_level() > 0) {
         continue_execution = col_reader->ReadValueBatch(
-            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
-            scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
+            &scratch_batch_->aux_mem_pool, scratch_capacity,
+            tuple_byte_size_, scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
       } else {
         continue_execution = col_reader->ReadNonRepeatedValueBatch(
-            scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_,
+            &scratch_batch_->aux_mem_pool, scratch_capacity, tuple_byte_size_,
             scratch_batch_->tuple_mem, &scratch_batch_->num_tuples);
       }
       // Check that all column readers populated the same number of values.
@@ -1014,7 +1013,6 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
   DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
   DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1);
-
   if (scratch_batch_->tuple_byte_size == 0) {
     Tuple** output_row =
         reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
@@ -1027,27 +1025,20 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) {
         scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
     memset(output_row, 0, num_tuples * sizeof(Tuple*));
     scratch_batch_->tuple_idx += num_tuples;
-    // If compressed Parquet was read then there may be data left to free from the
-    // scratch batch (originating from the decompressed data pool).
-    if (scratch_batch_->AtEnd()) scratch_batch_->mem_pool()->FreeAll();
+    // No data is required to back the empty tuples, so we should not attach any data to
+    // these batches.
+    DCHECK_EQ(0, scratch_batch_->total_allocated_bytes());
     return num_tuples;
   }
 
-  int num_row_to_commit;
+  int num_rows_to_commit;
   if (codegend_process_scratch_batch_fn_ != NULL) {
-    num_row_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
+    num_rows_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
   } else {
-    num_row_to_commit = ProcessScratchBatch(dst_batch);
-  }
-
-  // TODO: Consider compacting the output row batch to better handle cases where
-  // there are few surviving tuples per scratch batch. In such cases, we could
-  // quickly accumulate memory in the output batch, hit the memory capacity limit,
-  // and return an output batch with relatively few rows.
-  if (scratch_batch_->AtEnd()) {
-    dst_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false);
+    num_rows_to_commit = ProcessScratchBatch(dst_batch);
   }
-  return num_row_to_commit;
+  scratch_batch_->FinalizeTupleTransfer(dst_batch, num_rows_to_commit);
+  return num_rows_to_commit;
 }
 
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index f1bbf61..3465632 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -955,7 +955,7 @@ Status BaseScalarColumnReader::ReadDataPage() {
   // now complete, free up any memory allocated for it. If the data page contained
   // strings we need to attach it to the returned batch.
   if (CurrentPageContainsTupleData()) {
-    parent_->scratch_batch_->mem_pool()->AcquireData(
+    parent_->scratch_batch_->aux_mem_pool.AcquireData(
         decompressed_data_pool_.get(), false);
   } else {
     decompressed_data_pool_->FreeAll();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/be/src/exec/parquet-scratch-tuple-batch.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-scratch-tuple-batch.h b/be/src/exec/parquet-scratch-tuple-batch.h
index 4b83589..1b79be1 100644
--- a/be/src/exec/parquet-scratch-tuple-batch.h
+++ b/be/src/exec/parquet-scratch-tuple-batch.h
@@ -20,6 +20,7 @@
 
 #include "runtime/descriptors.h"
 #include "runtime/row-batch.h"
+#include "runtime/tuple-row.h"
 
 namespace impala {
 
@@ -27,49 +28,141 @@ namespace impala {
 /// as state associated with iterating over its tuples and transferring
 /// them to an output batch in TransferScratchTuples().
 struct ScratchTupleBatch {
-  // Memory backing the batch of tuples. Allocated from batch's tuple data pool.
-  uint8_t* tuple_mem;
+  // Memory for the fixed-length parts of the batch of tuples. Allocated from
+  // 'tuple_mem_pool'. Set to NULL when transferred to an output batch.
+  uint8_t* tuple_mem = nullptr;
+  // Number of tuples that can be stored in 'tuple_mem'.
+  int capacity;
   // Keeps track of the current tuple index.
-  int tuple_idx;
+  int tuple_idx = 0;
   // Number of valid tuples in tuple_mem.
-  int num_tuples;
-  // Cached for convenient access.
+  int num_tuples = 0;
+  // Number of tuples transferred to output batches (i.e. not filtered by predicates).
+  // num_tuples_transferred > 0 before a call to FinalizeTupleTransfer() implies that
+  // tuples from the current scratch batch were transferred to a previous output batch.
+  int num_tuples_transferred = 0;
+  // Bytes of fixed-length data per tuple.
   const int tuple_byte_size;
 
-  // Helper batch for safely allocating tuple_mem from its tuple data pool using
-  // ResizeAndAllocateTupleBuffer().
-  RowBatch batch;
+  // Pool used to allocate 'tuple_mem' and nothing else.
+  MemPool tuple_mem_pool;
+
+  // Pool used to accumulate other memory that may be referenced by var-len slots in this
+  // batch, e.g. decompression buffers, allocations for var-len strings and allocations
+  // for nested arrays. This memory may be referenced by previous batches or the current
+  // batch, but not by future batches. E.g. a decompression buffer can be safely attached
+  // only once all values referencing that buffer have been materialized into the batch.
+  MemPool aux_mem_pool;
+
+  // Tuples transferred to an output row batch are compacted if
+  // (# tuples materialized / # tuples returned) exceeds this number. Chosen so that the
+  // cost of copying the tuples should be very small in relation to the original cost of
+  // materialising them.
+  const int MIN_SELECTIVITY_TO_COMPACT = 16;
 
   ScratchTupleBatch(
       const RowDescriptor& row_desc, int batch_size, MemTracker* mem_tracker)
-    : tuple_mem(NULL),
-      tuple_idx(0),
-      num_tuples(0),
+    : capacity(batch_size),
       tuple_byte_size(row_desc.GetRowSize()),
-      batch(row_desc, batch_size, mem_tracker) {
+      tuple_mem_pool(mem_tracker),
+      aux_mem_pool(mem_tracker) {
     DCHECK_EQ(row_desc.tuple_descriptors().size(), 1);
   }
 
   Status Reset(RuntimeState* state) {
     tuple_idx = 0;
     num_tuples = 0;
-    // Buffer size is not needed.
-    int64_t buffer_size;
-    RETURN_IF_ERROR(batch.ResizeAndAllocateTupleBuffer(state, &buffer_size, &tuple_mem));
+    num_tuples_transferred = 0;
+    if (tuple_mem == nullptr) {
+      int64_t dummy;
+      RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(
+          state, &tuple_mem_pool, tuple_byte_size, &capacity, &dummy, &tuple_mem));
+    }
     return Status::OK();
   }
 
-  inline Tuple* GetTuple(int tuple_idx) const {
+  /// Release all memory in the MemPools. If 'dst_pool' is non-NULL, transfers it to
+  /// 'dst_pool'. Otherwise frees the memory.
+  void ReleaseResources(MemPool* dst_pool) {
+    if (dst_pool == nullptr) {
+      tuple_mem_pool.FreeAll();
+      aux_mem_pool.FreeAll();
+    } else {
+      dst_pool->AcquireData(&tuple_mem_pool, false);
+      dst_pool->AcquireData(&aux_mem_pool, false);
+    }
+    tuple_mem = nullptr;
+  }
+
+  /// Finalize transfer of 'num_to_commit' tuples to 'dst_batch' and transfer memory to
+  /// 'dst_batch' if at the end of 'scratch_batch'. The tuples must not yet be
+  /// committed to 'dst_batch'. Only needs to be called when materialising non-empty
+  /// tuples.
+  void FinalizeTupleTransfer(RowBatch* dst_batch, int num_to_commit) {
+    DCHECK_GE(num_to_commit, 0);
+    DCHECK_LE(dst_batch->num_rows() + num_to_commit, dst_batch->capacity());
+    DCHECK_LE(num_tuples_transferred + num_to_commit, num_tuples);
+    DCHECK(tuple_mem != nullptr);
+    num_tuples_transferred += num_to_commit;
+    if (!AtEnd()) return;
+    // We're at the end of the scratch batch. Transfer memory that may be referenced by
+    // transferred tuples or that we can't reuse to 'dst_batch'.
+
+    // Future tuples won't reference data in 'aux_mem_pool' - always transfer so that
+    // we don't accumulate unneeded memory in the scratch batch.
+    dst_batch->tuple_data_pool()->AcquireData(&aux_mem_pool, false);
+
+    // Try to avoid the transfer of 'tuple_mem' for selective scans by compacting the
+    // output batch. This avoids excessive allocation and transfer of memory, which
+    // can lead to performance problems like IMPALA-4923.
+    // Compaction is unsafe if the scratch batch was split across multiple output batches
+    // because the batch we returned earlier may hold a reference into 'tuple_mem'.
+    if (num_tuples_transferred > num_to_commit
+        || num_tuples_transferred * MIN_SELECTIVITY_TO_COMPACT > num_tuples
+        || !TryCompact(dst_batch, num_to_commit)) {
+      // Didn't compact - rows in 'dst_batch' reference 'tuple_mem'.
+      dst_batch->tuple_data_pool()->AcquireData(&tuple_mem_pool, false);
+      tuple_mem = nullptr;
+    }
+  }
+
+  /// Try to compact 'num_uncommitted_tuples' uncommitted tuples that were added to
+  /// the end of 'dst_batch' by copying them to memory allocated from
+  /// dst_batch->tuple_data_pool(). Returns true on success or false if the memory
+  /// could not be allocated.
+  bool TryCompact(RowBatch* dst_batch, int num_uncommitted_tuples) {
+    DCHECK_LE(dst_batch->num_rows() + num_uncommitted_tuples, dst_batch->capacity());
+    // Copy rows that reference 'tuple_mem' into a new small buffer. This code handles
+    // the case where num_uncommitted_tuples == 0, since TryAllocate() returns a non-null
+    // pointer.
+    int64_t dst_bytes = num_uncommitted_tuples * static_cast<int64_t>(tuple_byte_size);
+    uint8_t* dst_buffer = dst_batch->tuple_data_pool()->TryAllocate(dst_bytes);
+    if (dst_buffer == nullptr) return false;
+    const int end_row = dst_batch->num_rows() + num_uncommitted_tuples;
+    for (int i = dst_batch->num_rows(); i < end_row; ++i) {
+      TupleRow* row = dst_batch->GetRow(i);
+      Tuple* uncompacted_tuple = row->GetTuple(0);
+      DCHECK_GE(reinterpret_cast<uint8_t*>(uncompacted_tuple), tuple_mem);
+      DCHECK_LT(reinterpret_cast<uint8_t*>(uncompacted_tuple),
+          tuple_mem + tuple_byte_size * capacity);
+      row->SetTuple(0, reinterpret_cast<Tuple*>(dst_buffer));
+      memcpy(dst_buffer, uncompacted_tuple, tuple_byte_size);
+      dst_buffer += tuple_byte_size;
+    }
+    return true;
+  }
+
+  Tuple* GetTuple(int tuple_idx) const {
     return reinterpret_cast<Tuple*>(tuple_mem + tuple_idx * tuple_byte_size);
   }
 
-  inline MemPool* mem_pool() { return batch.tuple_data_pool(); }
-  inline int capacity() const { return batch.capacity(); }
-  inline uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
-  inline uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
-  inline bool AtEnd() const { return tuple_idx == num_tuples; }
+  uint8_t* CurrTuple() const { return tuple_mem + tuple_idx * tuple_byte_size; }
+  uint8_t* TupleEnd() const { return tuple_mem + num_tuples * tuple_byte_size; }
+  bool AtEnd() const { return tuple_idx == num_tuples; }
+  int64_t total_allocated_bytes() const {
+    return tuple_mem_pool.total_allocated_bytes() + aux_mem_pool.total_allocated_bytes();
+  }
 };
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 8c4ab54..7f200dc 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -449,18 +449,23 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
   return result;
 }
 
-Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state,
-    int64_t* tuple_buffer_size, uint8_t** buffer) {
-  const int row_size = row_desc_.GetRowSize();
+Status RowBatch::ResizeAndAllocateTupleBuffer(
+    RuntimeState* state, int64_t* buffer_size, uint8_t** buffer) {
+  return ResizeAndAllocateTupleBuffer(
+      state, &tuple_data_pool_, row_desc_.GetRowSize(), &capacity_, buffer_size, buffer);
+}
+
+Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* pool,
+    int row_size, int* capacity, int64_t* buffer_size, uint8_t** buffer) {
   // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
   if (row_size != 0) {
-    capacity_ = max(1, min(capacity_, FIXED_LEN_BUFFER_LIMIT / row_size));
+    *capacity = max(1, min(*capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
   }
-  *tuple_buffer_size = static_cast<int64_t>(row_size) * capacity_;
-  *buffer = tuple_data_pool_.TryAllocate(*tuple_buffer_size);
+  *buffer_size = static_cast<int64_t>(row_size) * *capacity;
+  *buffer = pool->TryAllocate(*buffer_size);
   if (*buffer == NULL) {
-    return mem_tracker_->MemLimitExceeded(state, "Failed to allocate tuple buffer",
-        *tuple_buffer_size);
+    return pool->mem_tracker()->MemLimitExceeded(
+        state, "Failed to allocate tuple buffer", *buffer_size);
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index cc41adb..e4b7191 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -338,8 +338,13 @@ class RowBatch {
   /// Returns Status::MEM_LIMIT_EXCEEDED and sets 'buffer' to NULL if a memory limit would
   /// have been exceeded. 'state' is used to log the error.
   /// On success, sets 'buffer_size' to the size in bytes and 'buffer' to the buffer.
-  Status ResizeAndAllocateTupleBuffer(RuntimeState* state, int64_t* buffer_size,
-       uint8_t** buffer);
+  Status ResizeAndAllocateTupleBuffer(
+      RuntimeState* state, int64_t* buffer_size, uint8_t** buffer);
+
+  /// Same as above except allocates buffer for 'capacity' rows with fixed-length portions
+  /// of 'row_size' bytes each from 'pool', instead of using RowBatch's member variables.
+  static Status ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* pool,
+      int row_size, int* capacity, int64_t* buffer_size, uint8_t** buffer);
 
   /// Helper function to log the batch's rows if VLOG_ROW is enabled. 'context' is a
   /// string to prepend to the log message.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4343895/testdata/workloads/functional-query/queries/QueryTest/parquet.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
index a449162..0d76ab1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test
@@ -140,3 +140,61 @@ bigint
 15046
 15047
 ====
+---- QUERY
+# Test batch compaction logic with selective scan returning a variety of column types.
+# On average there should be a couple of rows per batch of 1024.
+select * from alltypesagg where id % 500 = 0
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int,int
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,1
+0,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/01/10','0',2010-01-01 00:00:00,2010,1,NULL
+500,true,NULL,NULL,500,5000,550,5050,'01/01/10','500',2010-01-01 08:40:47.500000000,2010,1,1
+500,true,NULL,NULL,500,5000,550,5050,'01/01/10','500',2010-01-01 08:40:47.500000000,2010,1,NULL
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,2
+1000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/02/10','0',2010-01-02 00:00:00,2010,1,NULL
+1500,true,NULL,NULL,500,5000,550,5050,'01/02/10','500',2010-01-02 08:40:47.500000000,2010,1,2
+1500,true,NULL,NULL,500,5000,550,5050,'01/02/10','500',2010-01-02 08:40:47.500000000,2010,1,NULL
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,3
+2000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/03/10','0',2010-01-03 00:00:00,2010,1,NULL
+2500,true,NULL,NULL,500,5000,550,5050,'01/03/10','500',2010-01-03 08:40:47.500000000,2010,1,3
+2500,true,NULL,NULL,500,5000,550,5050,'01/03/10','500',2010-01-03 08:40:47.500000000,2010,1,NULL
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,4
+3000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/04/10','0',2010-01-04 00:00:00,2010,1,NULL
+3500,true,NULL,NULL,500,5000,550,5050,'01/04/10','500',2010-01-04 08:40:47.500000000,2010,1,4
+3500,true,NULL,NULL,500,5000,550,5050,'01/04/10','500',2010-01-04 08:40:47.500000000,2010,1,NULL
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,5
+4000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/05/10','0',2010-01-05 00:00:00,2010,1,NULL
+4500,true,NULL,NULL,500,5000,550,5050,'01/05/10','500',2010-01-05 08:40:47.500000000,2010,1,5
+4500,true,NULL,NULL,500,5000,550,5050,'01/05/10','500',2010-01-05 08:40:47.500000000,2010,1,NULL
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,6
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,NULL
+5500,true,NULL,NULL,500,5000,550,5050,'01/06/10','500',2010-01-06 08:40:47.500000000,2010,1,6
+5500,true,NULL,NULL,500,5000,550,5050,'01/06/10','500',2010-01-06 08:40:47.500000000,2010,1,NULL
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,7
+6000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/07/10','0',2010-01-07 00:00:00,2010,1,NULL
+6500,true,NULL,NULL,500,5000,550,5050,'01/07/10','500',2010-01-07 08:40:47.500000000,2010,1,7
+6500,true,NULL,NULL,500,5000,550,5050,'01/07/10','500',2010-01-07 08:40:47.500000000,2010,1,NULL
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,8
+7000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/08/10','0',2010-01-08 00:00:00,2010,1,NULL
+7500,true,NULL,NULL,500,5000,550,5050,'01/08/10','500',2010-01-08 08:40:47.500000000,2010,1,8
+7500,true,NULL,NULL,500,5000,550,5050,'01/08/10','500',2010-01-08 08:40:47.500000000,2010,1,NULL
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,9
+8000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/09/10','0',2010-01-09 00:00:00,2010,1,NULL
+8500,true,NULL,NULL,500,5000,550,5050,'01/09/10','500',2010-01-09 08:40:47.500000000,2010,1,9
+8500,true,NULL,NULL,500,5000,550,5050,'01/09/10','500',2010-01-09 08:40:47.500000000,2010,1,NULL
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,10
+9000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/10/10','0',2010-01-10 00:00:00,2010,1,NULL
+9500,true,NULL,NULL,500,5000,550,5050,'01/10/10','500',2010-01-10 08:40:47.500000000,2010,1,10
+9500,true,NULL,NULL,500,5000,550,5050,'01/10/10','500',2010-01-10 08:40:47.500000000,2010,1,NULL
+====
+---- QUERY
+# Test batch compaction logic with selective scan returning a variety of column types.
+# Most batches should be empty
+select * from alltypesagg where id = 5000
+---- TYPES
+int,boolean,tinyint,smallint,int,bigint,float,double,string,string,timestamp,int,int,int
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,6
+5000,true,NULL,NULL,NULL,NULL,NULL,NULL,'01/06/10','0',2010-01-06 00:00:00,2010,1,NULL
+====