You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/14 19:05:06 UTC

[09/18] incubator-impala git commit: IMPALA-2885: ScannerContext::Stream objects should be owned by ScannerContext

IMPALA-2885: ScannerContext::Stream objects should be owned by ScannerContext

Each scanner context has at least one stream which corresponds to
a scan range. For parquet scanner, there can be multiple streams.
These Stream objects are stored in the RuntimeState's object pool
even though they have the same life span of the scanner threads.
This change makes ScannerContext the owner of the Stream objects
so they will be freed when ScannerContext is destroyed.

Change-Id: Ic5440d414ecc0ca19676c553275aeb85231d6045
Reviewed-on: http://gerrit.cloudera.org:8080/3590
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/59cdec21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/59cdec21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/59cdec21

Branch: refs/heads/master
Commit: 59cdec21bf2a890e6f7fe856a8e1c616e8bebec3
Parents: f4fbd79
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jul 6 19:20:34 2016 -0700
Committer: Taras Bobrovytsky <ta...@apache.org>
Committed: Thu Jul 14 19:04:44 2016 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-scanner.cc      | 2 ++
 be/src/exec/hdfs-text-scanner.cc | 5 +++--
 be/src/exec/scanner-context.cc   | 6 +++---
 be/src/exec/scanner-context.h    | 6 +++---
 4 files changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 275956c..5abd346 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -209,6 +209,8 @@ Status HdfsScanner::CommitRows(int num_rows) {
 
 void HdfsScanner::AddFinalRowBatch() {
   DCHECK(batch_ != NULL);
+  // Cannot DCHECK(stream_ != NULL) as parquet scanner sets it to NULL in ProcessSplit().
+  stream_ = NULL;
   context_->ReleaseCompletedResources(batch_, /* done */ true);
   scan_node_->AddMaterializedRowBatch(batch_);
   batch_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6cc308d..dcd3081 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -180,6 +180,7 @@ void HdfsTextScanner::Close() {
     decompressor_->Close();
     decompressor_.reset(NULL);
   }
+  THdfsCompression::type compression = stream_->file_desc()->file_compression;
   if (batch_ != NULL) {
     AttachPool(data_buffer_pool_.get(), false);
     AttachPool(boundary_pool_.get(), false);
@@ -189,9 +190,9 @@ void HdfsTextScanner::Close() {
   DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0);
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
+  // Must happen after AddFinalRowBatch() is called.
   if (!only_parsing_header_) {
-    scan_node_->RangeComplete(
-        THdfsFileFormat::TEXT, stream_->file_desc()->file_compression);
+    scan_node_->RangeComplete(THdfsFileFormat::TEXT, compression);
   }
   HdfsScanner::Close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index f7f65be..4935f4c 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -63,7 +63,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent)
 }
 
 ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
-  Stream* stream = state_->obj_pool()->Add(new Stream(this));
+  std::unique_ptr<Stream> stream(new Stream(this));
   stream->scan_range_ = range;
   stream->file_desc_ = scan_node_->GetFileDesc(stream->filename());
   stream->file_len_ = stream->file_desc_->file_length;
@@ -76,8 +76,8 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) {
   stream->output_buffer_bytes_left_ =
       const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT);
   stream->contains_tuple_data_ = scan_node_->tuple_desc()->ContainsStringData();
-  streams_.push_back(stream);
-  return stream;
+  streams_.push_back(std::move(stream));
+  return streams_.back().get();
 }
 
 void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/59cdec21/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 2ab35e6..b90d512 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -264,7 +264,7 @@ class ScannerContext {
   Stream* GetStream(int idx = 0) {
     DCHECK_GE(idx, 0);
     DCHECK_LT(idx, streams_.size());
-    return streams_[idx];
+    return streams_[idx].get();
   }
 
   /// If a non-NULL 'batch' is passed, attaches completed io buffers and boundary mem pools
@@ -304,8 +304,8 @@ class ScannerContext {
 
   HdfsPartitionDescriptor* partition_desc_;
 
-  /// Vector of streams.  Non-columnar formats will always have one stream per context.
-  std::vector<Stream*> streams_;
+  /// Vector of streams. Non-columnar formats will always have one stream per context.
+  std::vector<std::unique_ptr<Stream>> streams_;
 
   /// Always equal to the sum of completed_io_buffers_.size() across all streams.
   int num_completed_io_buffers_;