You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/08/20 03:10:03 UTC

[2/2] incubator-impala git commit: IMPALA-3662: Don't double allocate tuples buffer in parquet scanner

IMPALA-3662: Don't double allocate tuples buffer in parquet scanner

HdfsScanner::StartNewRowBatch() is called once per row batch
by the parquet scanner to allocate a new row batch and tuple
buffer. Similarly, a scratch batch is created for each row
batch in HdfsParquetScanner::AssembleRows() which also contains
the tuple buffer. In reality, only the tuple buffer in the
scratch batch is used. So, the tuple buffer allocated by
HdfsScanner::StartNewRowBatch() is unused memory for the
parquet scanner.

This change fixes the problem above by implementing
HdfsParquetScanner::StartNewRowBatch() which creates
a new row batch without allocating the tuple buffer.
With this patch, the memory consumption when
materializing very wide tuples is reduced by half.

Change-Id: I826061a2be10fd0528ca4dd1e97146e3cb983370
Reviewed-on: http://gerrit.cloudera.org:8080/4064
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1522da35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1522da35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1522da35

Branch: refs/heads/master
Commit: 1522da3510a36635e3fc694b26211554fcd2793a
Parents: 02608f8
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Aug 18 22:31:34 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Aug 20 03:03:10 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 10 ++++++++--
 be/src/exec/hdfs-parquet-scanner.h  |  5 +++++
 be/src/exec/hdfs-scanner.h          |  2 +-
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ee5f4d9..2aaf8de 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -301,13 +301,13 @@ Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(add_batches_to_queue_);
   bool scanner_eos = false;
   do {
-    RETURN_IF_ERROR(StartNewRowBatch());
+    StartNewParquetRowBatch();
     RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos));
     scan_node_->AddMaterializedRowBatch(batch_);
   } while (!scanner_eos && !scan_node_->ReachedLimit());
 
   // Transfer the remaining resources to this new batch in Close().
-  RETURN_IF_ERROR(StartNewRowBatch());
+  StartNewParquetRowBatch();
   return Status::OK();
 }
 
@@ -529,6 +529,12 @@ Status HdfsParquetScanner::AssembleRows(
   return Status::OK();
 }
 
+void HdfsParquetScanner::StartNewParquetRowBatch() {
+  DCHECK(add_batches_to_queue_);
+  batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+      scan_node_->mem_tracker());
+}
+
 Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
   DCHECK(dst_batch != NULL);
   dst_batch->CommitRows(num_rows);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 1df09a4..dfd7785 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -446,6 +446,11 @@ class HdfsParquetScanner : public HdfsScanner {
   Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
       RowBatch* row_batch, bool* skip_row_group);
 
+  /// Set 'batch_' to a new row batch. Unlike the similarly named function in
+  /// HdfsScanner, this function will not allocate the tuple buffer. Only valid
+  /// to call if 'add_batches_to_queue_' is true.
+  void StartNewParquetRowBatch();
+
   /// Commit num_rows to the given row batch.
   /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
   /// Scanner can call this with 0 rows to flush any pending resources (attached pools

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 53711ab..7ddf0a5 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -282,7 +282,7 @@ class HdfsScanner {
   Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
       THdfsFileFormat::type type, const std::string& scanner_name);
 
-  /// Set batch_ to a new row batch and update tuple_mem_ accordingly.
+  /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
   /// Only valid to call if 'add_batches_to_queue_' is true.
   Status StartNewRowBatch();