You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/08/20 03:10:03 UTC
[2/2] incubator-impala git commit: IMPALA-3662: Don't double allocate
tuples buffer in parquet scanner
IMPALA-3662: Don't double allocate tuples buffer in parquet scanner
HdfsScanner::StartNewRowBatch() is called once per row batch
by the parquet scanner to allocate a new row batch and tuple
buffer. Similarly, a scratch batch is created for each row
batch in HdfsParquetScanner::AssembleRows() which also contains
the tuple buffer. In reality, only the tuple buffer in the
scratch batch is used. So, the tuple buffer allocated by
HdfsScanner::StartNewRowBatch() is unused memory for the
parquet scanner.
This change fixes the problem above by implementing
HdfsParquetScanner::StartNewRowBatch() which creates
a new row batch without allocating the tuple buffer.
With this patch, the memory consumption when
materializing very wide tuples is reduced by half.
Change-Id: I826061a2be10fd0528ca4dd1e97146e3cb983370
Reviewed-on: http://gerrit.cloudera.org:8080/4064
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1522da35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1522da35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1522da35
Branch: refs/heads/master
Commit: 1522da3510a36635e3fc694b26211554fcd2793a
Parents: 02608f8
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Aug 18 22:31:34 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Aug 20 03:03:10 2016 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 10 ++++++++--
be/src/exec/hdfs-parquet-scanner.h | 5 +++++
be/src/exec/hdfs-scanner.h | 2 +-
3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index ee5f4d9..2aaf8de 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -301,13 +301,13 @@ Status HdfsParquetScanner::ProcessSplit() {
DCHECK(add_batches_to_queue_);
bool scanner_eos = false;
do {
- RETURN_IF_ERROR(StartNewRowBatch());
+ StartNewParquetRowBatch();
RETURN_IF_ERROR(GetNextInternal(batch_, &scanner_eos));
scan_node_->AddMaterializedRowBatch(batch_);
} while (!scanner_eos && !scan_node_->ReachedLimit());
// Transfer the remaining resources to this new batch in Close().
- RETURN_IF_ERROR(StartNewRowBatch());
+ StartNewParquetRowBatch();
return Status::OK();
}
@@ -529,6 +529,12 @@ Status HdfsParquetScanner::AssembleRows(
return Status::OK();
}
+void HdfsParquetScanner::StartNewParquetRowBatch() {
+ DCHECK(add_batches_to_queue_);
+ batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(),
+ scan_node_->mem_tracker());
+}
+
Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) {
DCHECK(dst_batch != NULL);
dst_batch->CommitRows(num_rows);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 1df09a4..dfd7785 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -446,6 +446,11 @@ class HdfsParquetScanner : public HdfsScanner {
Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers,
RowBatch* row_batch, bool* skip_row_group);
+ /// Set 'batch_' to a new row batch. Unlike the similarly named function in
+ /// HdfsScanner, this function will not allocate the tuple buffer. Only valid
+ /// to call if 'add_batches_to_queue_' is true.
+ void StartNewParquetRowBatch();
+
/// Commit num_rows to the given row batch.
/// Returns OK if the query is not cancelled and hasn't exceeded any mem limits.
/// Scanner can call this with 0 rows to flush any pending resources (attached pools
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1522da35/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 53711ab..7ddf0a5 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -282,7 +282,7 @@ class HdfsScanner {
Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
THdfsFileFormat::type type, const std::string& scanner_name);
- /// Set batch_ to a new row batch and update tuple_mem_ accordingly.
+ /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly.
/// Only valid to call if 'add_batches_to_queue_' is true.
Status StartNewRowBatch();