You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/10/23 15:44:10 UTC

[3/3] incubator-impala git commit: IMPALA-5307: part 1: don't transfer disk I/O buffers out of parquet

IMPALA-5307: part 1: don't transfer disk I/O buffers out of parquet

This change only affects uncompressed plain-encoded Parquet where
RowBatches may directly reference strings stored in the I/O
buffers. The proposed fix is to simply copy the data pages if
needed then use the same logic that we use for decompressed data
pages.

This copy inevitably adds some CPU overhead, but I believe this is
acceptable because:
* We generally recommend using compression, and optimize for that
  case.
* Copying memory is cheaper than decompressing data.
* Scans of uncompressed data are very likely to be I/O bound.

This allows several major simplifications:
* The resource management for compressed and uncompressed
  scans is much more similar.
* We don't need to attach Disk I/O buffers to RowBatches.
* We don't need to deal with attaching I/O buffers in
  ScannerContext.
* Column readers can release each I/O buffer *before* advancing to
  the next one, making it easier to reason about resource
  consumption. E.g. each Parquet column only needs one I/O buffer at
  a time to make progress.

Future changes will apply to Avro, Sequence Files and Text. Once
all scanners are converted, ScannerContext::contains_tuple_data_
will always be false and we can remove some dead code.

Testing
=======
Ran core ASAN and exhaustive debug builds.

Perf
====
No difference in most cases when scanning uncompressed parquet.
There is a significant regression (50% increase in runtime) in
targeted perf tests scanning non-dictionary-encoded strings (see
benchmark output below).  After the regression performance is
comparable to Snappy compression.

I also did a TPC-H run but ran into some issues with the report
generator. I manually compared times and there were no regressions.

+--------------------+-----------------------+---------+------------+------------+----------------+
| Workload           | File Format           | Avg (s) | Delta(Avg) | GeoMean(s) | Delta(GeoMean) |
+--------------------+-----------------------+---------+------------+------------+----------------+
| TARGETED-PERF(_61) | parquet / none / none | 23.02   | +0.60%     | 4.23       | +5.97%         |
+--------------------+-----------------------+---------+------------+------------+----------------+

+--------------------+--------------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| Workload           | Query              | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Num Clients | Iters |
+--------------------+--------------------+-----------------------+--------+-------------+------------+------------+----------------+-------------+-------+
| TARGETED-PERF(_61) | PERF_STRING-Q2     | parquet / none / none | 3.00   | 1.98        | R +52.10%  |   0.97%    |   1.25%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q1     | parquet / none / none | 2.86   | 1.92        | R +49.11%  |   0.34%    |   2.34%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q3     | parquet / none / none | 3.16   | 2.15        | R +47.04%  |   1.03%    |   0.72%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q4     | parquet / none / none | 3.16   | 2.17        | R +45.60%  |   0.14%    |   1.11%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q5     | parquet / none / none | 3.51   | 2.55        | R +37.88%  |   0.83%    |   0.49%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_AGG-Q5        | parquet / none / none | 0.79   | 0.61        | R +30.86%  |   1.54%    |   4.10%        | 1           | 5     |
| TARGETED-PERF(_61) | primitive_top-n_al | parquet / none / none | 39.45  | 35.07       |   +12.51%  |   0.29%    |   0.29%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q7     | parquet / none / none | 6.78   | 6.10        |   +11.13%  |   0.99%    |   0.74%        | 1           | 5     |
| TARGETED-PERF(_61) | PERF_STRING-Q6     | parquet / none / none | 8.83   | 8.14        |   +8.52%   |   0.15%    |   0.32%        | 1           | 5     |
...

Change-Id: I767c1e2dabde7d5bd7a4d5c1ec6d14801b8260d2
Reviewed-on: http://gerrit.cloudera.org:8080/8085
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b4c24ad2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b4c24ad2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b4c24ad2

Branch: refs/heads/master
Commit: b4c24ad2ad66cdb9ecffb754b8d133e476c99b3e
Parents: 41f0c6a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed May 10 14:56:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Oct 21 10:24:51 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   | 26 ++++--------------
 be/src/exec/parquet-column-readers.cc | 44 ++++++++++++++++++++++--------
 be/src/exec/parquet-column-readers.h  | 33 ++++++++++++++--------
 be/src/exec/scanner-context.h         | 16 +++++------
 4 files changed, 66 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4c24ad2/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 455c315..b301ea0 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -228,7 +228,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
 
   // Release I/O buffers immediately to make sure they are cleaned up
   // in case we return a non-OK status anywhere below.
-  context_->ReleaseCompletedResources(NULL, true);
+  context_->ReleaseCompletedResources(nullptr, true);
   RETURN_IF_ERROR(footer_status);
 
   // Parse the file schema into an internal representation for schema resolution.
@@ -728,10 +728,10 @@ Status HdfsParquetScanner::NextRowGroup() {
 }
 
 void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) {
-  DCHECK(row_batch != NULL);
+  DCHECK(row_batch != nullptr);
   row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
   scratch_batch_->ReleaseResources(row_batch->tuple_data_pool());
-  context_->ReleaseCompletedResources(row_batch, true);
+  context_->ReleaseCompletedResources(nullptr, true);
   for (ParquetColumnReader* col_reader : column_readers_) {
     col_reader->Close(row_batch);
   }
@@ -1018,15 +1018,6 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   DCHECK(dst_batch != NULL);
   dst_batch->CommitRows(num_rows);
 
-  // We need to pass the row batch to the scan node if there is too much memory attached,
-  // which can happen if the query is very selective. We need to release memory even
-  // if no rows passed predicates. We should only do this when all rows have been copied
-  // from the scratch batch, since those rows may reference completed I/O buffers in
-  // 'context_'.
-  if (scratch_batch_->AtEnd()
-      && (dst_batch->AtCapacity() || context_->num_completed_io_buffers() > 0)) {
-    context_->ReleaseCompletedResources(dst_batch, /* done */ false);
-  }
   if (context_->cancelled()) return Status::CANCELLED;
   // TODO: It's a really bad idea to propagate UDF error via the global RuntimeState.
   // Store UDF error in thread local storage or make UDF return status so it can merge
@@ -1682,15 +1673,8 @@ Status HdfsParquetScanner::InitColumns(
     DCHECK(stream != NULL);
 
     RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
-
-    const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
-    if (slot_desc == NULL || !slot_desc->type().IsStringType() ||
-        col_chunk.meta_data.codec != parquet::CompressionCodec::UNCOMPRESSED) {
-      // Non-string types are always compact.  Compressed columns don't reference data in
-      // the io buffers after tuple materialization.  In both cases, we can set compact to
-      // true and recycle buffers more promptly.
-      stream->set_contains_tuple_data(false);
-    }
+    // Parquet column readers never return tuple data with pointers into I/O buffers.
+    stream->set_contains_tuple_data(false);
   }
   DCHECK_EQ(col_ranges.size(), num_scalar_readers);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4c24ad2/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index ae0c0ca..ad12916 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -974,12 +974,14 @@ Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
   // now complete, free up any memory allocated for it. If the data page contained
   // strings we need to attach it to the returned batch.
-  if (CurrentPageContainsTupleData()) {
-    parent_->scratch_batch_->aux_mem_pool.AcquireData(
-        decompressed_data_pool_.get(), false);
+  if (PageContainsTupleData(page_encoding_)) {
+    parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(), false);
   } else {
-    decompressed_data_pool_->FreeAll();
+    data_page_pool_->FreeAll();
   }
+  // We don't hold any pointers to earlier pages in the stream - we can safely free
+  // any accumulated I/O or boundary buffers.
+  stream_->ReleaseCompletedResources(nullptr, false);
 
   // Read the next data page, skipping page types we don't care about.
   // We break out of this loop on the non-error case (a data page was found or we read all
@@ -1043,14 +1045,9 @@ Status BaseScalarColumnReader::ReadDataPage() {
     int uncompressed_size = current_page_header_.uncompressed_page_size;
     if (decompressor_.get() != NULL) {
       SCOPED_TIMER(parent_->decompress_timer_);
-      uint8_t* decompressed_buffer =
-          decompressed_data_pool_->TryAllocate(uncompressed_size);
-      if (UNLIKELY(decompressed_buffer == NULL)) {
-        string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage",
-            uncompressed_size, "decompressed data");
-        return decompressed_data_pool_->mem_tracker()->MemLimitExceeded(
-            parent_->state_, details, uncompressed_size);
-      }
+      uint8_t* decompressed_buffer;
+      RETURN_IF_ERROR(AllocateUncompressedDataPage(
+            uncompressed_size, "decompressed data", &decompressed_buffer));
       RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
           current_page_header_.compressed_page_size, data_, &uncompressed_size,
           &decompressed_buffer));
@@ -1071,6 +1068,17 @@ Status BaseScalarColumnReader::ReadDataPage() {
             "Expected $1 bytes but got $2", filename(),
             current_page_header_.compressed_page_size, uncompressed_size));
       }
+      if (PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
+        // In this case returned batches will have pointers into the data page itself.
+        // We don't transfer disk I/O buffers out of the scanner so we need to copy
+        // the page data so that it can be attached to output batches.
+        uint8_t* copy_buffer;
+        RETURN_IF_ERROR(AllocateUncompressedDataPage(
+              uncompressed_size, "uncompressed variable-length data", &copy_buffer));
+        memcpy(copy_buffer, data_, uncompressed_size);
+        data_ = copy_buffer;
+        data_end_ = data_ + uncompressed_size;
+      }
     }
 
     // Initialize the repetition level data
@@ -1094,6 +1102,18 @@ Status BaseScalarColumnReader::ReadDataPage() {
   return Status::OK();
 }
 
+Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
+    const char* err_ctx, uint8_t** buffer) {
+  *buffer = data_page_pool_->TryAllocate(size);
+  if (*buffer == nullptr) {
+    string details =
+        Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size, err_ctx);
+    return data_page_pool_->mem_tracker()->MemLimitExceeded(
+        parent_->state_, details, size);
+  }
+  return Status::OK();
+}
+
 template<bool ADVANCE_REP_LEVEL>
 bool BaseScalarColumnReader::NextLevels() {
   if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4c24ad2/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 493997a..dea84a8 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -315,7 +315,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
       num_values_read_(0),
       metadata_(NULL),
       stream_(NULL),
-      decompressed_data_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+      data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
@@ -346,10 +346,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   }
 
   virtual void Close(RowBatch* row_batch) {
-    if (row_batch != nullptr && CurrentPageContainsTupleData()) {
-      row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), false);
+    if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) {
+      row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false);
     } else {
-      decompressed_data_pool_->FreeAll();
+      data_page_pool_->FreeAll();
     }
     if (decompressor_ != nullptr) decompressor_->Close();
   }
@@ -406,7 +406,8 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Decoder for repetition levels.
   ParquetLevelDecoder rep_levels_;
 
-  /// Page encoding for values. Cached here for perf.
+  /// Page encoding for values of the current data page. Cached here for perf. Set in
+  /// InitDataPage().
   parquet::Encoding::type page_encoding_;
 
   /// Num values remaining in the current data page
@@ -422,8 +423,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   boost::scoped_ptr<Codec> decompressor_;
   ScannerContext::Stream* stream_;
 
-  /// Pool to allocate decompression buffers from.
-  boost::scoped_ptr<MemPool> decompressed_data_pool_;
+  /// Pool to allocate storage for data pages from - either decompression buffers for
+  /// compressed data pages or copies of the data page with var-len data to attach to
+  /// batches.
+  boost::scoped_ptr<MemPool> data_page_pool_;
 
   /// Header for current data page.
   parquet::PageHeader current_page_header_;
@@ -467,13 +470,19 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// 'size' bytes remaining.
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
-  /// Returns true if the current data page may contain strings referenced by returned
-  /// batches. Cases where this is not true are:
+  /// Allocate memory for the uncompressed contents of a data page of 'size' bytes from
+  /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On success, 'buffer'
+  /// points to the allocated memory. Otherwise an error status is returned.
+  Status AllocateUncompressedDataPage(
+      int64_t size, const char* err_ctx, uint8_t** buffer);
+
+  /// Returns true if a data page for this column with the specified 'encoding' may
+  /// contain strings referenced by returned batches. Cases where this is not true are:
   /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'.
   /// * Fixed-length slots, where there is no string data.
-  bool CurrentPageContainsTupleData() {
-    return page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
-        && slot_desc_ != nullptr && slot_desc_->type().IsStringType();
+  bool PageContainsTupleData(parquet::Encoding::type page_encoding) {
+    return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
+        && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
   }
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b4c24ad2/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index c13d26f..3fe14aa 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -175,6 +175,14 @@ class ScannerContext {
     /// Skip this text object.
     bool SkipText(Status*);
 
+    /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed
+    /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight
+    /// resources are also attached or released.
+    /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' false. Such
+    /// a call will release all completed resources. If 'done' is true all in-flight
+    /// resources are also freed.
+    void ReleaseCompletedResources(RowBatch* batch, bool done);
+
    private:
     friend class ScannerContext;
     ScannerContext* parent_;
@@ -257,14 +265,6 @@ class ScannerContext {
     /// never set to NULL, even if it contains 0 bytes.
     Status GetNextBuffer(int64_t read_past_size = 0);
 
-    /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed
-    /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight
-    /// resources are also attached or released.
-    /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' false. Such
-    /// a call will release all completed resources. If 'done' is true all in-flight
-    /// resources are also freed.
-    void ReleaseCompletedResources(RowBatch* batch, bool done);
-
     /// Validates that the output buffer pointers point to the correct buffer.
     bool ValidateBufferPointers() const;