You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/07/21 07:30:45 UTC

incubator-impala git commit: IMPALA-3854: Fix use-after-free in HdfsTextScanner::Close()

Repository: incubator-impala
Updated Branches:
  refs/heads/master 48725753e -> 65806e200


IMPALA-3854: Fix use-after-free in HdfsTextScanner::Close()

A recent commit changed the ownership of the Stream object to
its owning ScannerContext. After that change, a Stream object
will be destroyed in ScannerContext::ReleaseCompletedResources()
if the parameter 'done' is true. That usually happens in the
Close() function of a scanner. However, for the text scanner,
the Stream object can be destroyed after handling compressed data
before Close() is called. In that case, the cached handle to the
Stream object is invalid when it's referenced in Close() to
access the compression codec of the Stream object.

This change fixes the above problem by not deleting the stream
objects in ScannerContext::ReleaseCompletedResources(). Instead
a new function ScannerContext::ClearStreams() is added for that
purpose and it's invoked in HdfsScanner::Close() to release all
the stream objects. This avoids other use-after-free problems
in the code.

Change-Id: Ia88f6285563ff669ae215af22ed2d45e5398adae
Reviewed-on: http://gerrit.cloudera.org:8080/3630
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/65806e20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/65806e20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/65806e20

Branch: refs/heads/master
Commit: 65806e200e8c45ab465c9b0f6678bd2670688617
Parents: 4872575
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue Jul 12 20:19:58 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Thu Jul 21 00:30:39 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc |  1 +
 be/src/exec/hdfs-scan-node.cc       |  6 +++---
 be/src/exec/hdfs-scanner.cc         |  4 ++--
 be/src/exec/hdfs-text-scanner.cc    |  6 +++---
 be/src/exec/scanner-context.cc      |  9 ++++++++-
 be/src/exec/scanner-context.h       | 21 ++++++++++++---------
 6 files changed, 29 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 1ebb650..6855fef 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -309,6 +309,7 @@ Status HdfsParquetScanner::ProcessSplit() {
     // streams could either be just the footer stream or streams for the previous row
     // group.
     context_->ReleaseCompletedResources(batch_, /* done */ true);
+    context_->ClearStreams();
     // Commit the rows to flush the row batch from the previous row group
     CommitRows(0);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 19a071f..9a89ac3 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -410,6 +410,8 @@ Status HdfsScanNode::CreateAndPrepareScanner(HdfsPartitionDescriptor* partition,
   if (status.ok()) {
     status = scanner->get()->Prepare(context);
     if (!status.ok()) scanner->get()->Close();
+  } else {
+    context->ClearStreams();
   }
   return status;
 }
@@ -1234,9 +1236,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
     // Parquet doesn't read the range end to end so the current offset isn't useful.
     // TODO: make sure the parquet reader is outputting as much diagnostic
     // information as possible.
-    // The error status may not necessarily be related to this scanner thread so this
-    // thread may have run to completion and closed all its streams already.
-    if (partition->file_format() != THdfsFileFormat::PARQUET && context.HasStream()) {
+    if (partition->file_format() != THdfsFileFormat::PARQUET) {
       ScannerContext::Stream* stream = context.GetStream();
       ss << " Processed " << stream->total_bytes_returned() << " bytes.";
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 4f63a73..ea101c0 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -129,6 +129,8 @@ void HdfsScanner::Close() {
     Expr::Close(iter->second, state_);
   }
   obj_pool_.Clear();
+  stream_ = NULL;
+  context_->ClearStreams();
 }
 
 Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition,
@@ -209,8 +211,6 @@ Status HdfsScanner::CommitRows(int num_rows) {
 
 void HdfsScanner::AddFinalRowBatch() {
   DCHECK(batch_ != NULL);
-  // Cannot DCHECK(stream_ != NULL) as parquet scanner sets it to NULL in ProcessSplit().
-  stream_ = NULL;
   context_->ReleaseCompletedResources(batch_, /* done */ true);
   scan_node_->AddMaterializedRowBatch(batch_);
   batch_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 6d45880..5d80f06 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -180,7 +180,6 @@ void HdfsTextScanner::Close() {
     decompressor_->Close();
     decompressor_.reset(NULL);
   }
-  THdfsCompression::type compression = stream_->file_desc()->file_compression;
   if (batch_ != NULL) {
     AttachPool(data_buffer_pool_.get(), false);
     AttachPool(boundary_pool_.get(), false);
@@ -192,7 +191,8 @@ void HdfsTextScanner::Close() {
   DCHECK_EQ(context_->num_completed_io_buffers(), 0);
   // Must happen after AddFinalRowBatch() is called.
   if (!only_parsing_header_) {
-    scan_node_->RangeComplete(THdfsFileFormat::TEXT, compression);
+    scan_node_->RangeComplete(THdfsFileFormat::TEXT,
+        stream_->file_desc()->file_compression);
   }
   HdfsScanner::Close();
 }
@@ -575,7 +575,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) {
       reinterpret_cast<uint8_t*>(byte_buffer_ptr_), &decompressed_len,
       &decompressed_buffer));
 
-  // Inform stream_ that the buffer with the compressed text can be released.
+  // Inform 'stream_' that the buffer with the compressed text can be released.
   context_->ReleaseCompletedResources(NULL, true);
 
   VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 4935f4c..ace8b66 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -48,11 +48,18 @@ ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNode* scan_node,
   AddStream(scan_range);
 }
 
+ScannerContext::~ScannerContext() {
+  DCHECK(streams_.empty());
+}
+
 void ScannerContext::ReleaseCompletedResources(RowBatch* batch, bool done) {
   for (int i = 0; i < streams_.size(); ++i) {
     streams_[i]->ReleaseCompletedResources(batch, done);
   }
-  if (done) streams_.clear();
+}
+
+void ScannerContext::ClearStreams() {
+  streams_.clear();
 }
 
 ScannerContext::Stream::Stream(ScannerContext* parent)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/65806e20/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index b90d512..02b73e5 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -60,6 +60,9 @@ class ScannerContext {
   ScannerContext(RuntimeState*, HdfsScanNode*, HdfsPartitionDescriptor*,
       DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs);
 
+  /// Destructor verifies that all stream objects have been released.
+  ~ScannerContext();
+
   /// Encapsulates a stream (continuous byte range) that can be read.  A context
   /// can contain one or more streams.  For non-columnar files, there is only
   /// one stream; for columnar, there is one stream per column.
@@ -257,10 +260,6 @@ class ScannerContext {
     Status ReportInvalidInt();
   };
 
-  bool HasStream() {
-    return !streams_.empty();
-  }
-
   Stream* GetStream(int idx = 0) {
     DCHECK_GE(idx, 0);
     DCHECK_LT(idx, streams_.size());
@@ -276,15 +275,19 @@ class ScannerContext {
   /// is set. In that case, contains_tuple_data_ should be false.
   //
   /// If 'done' is true, this is the final call for the current streams and any pending
-  /// resources in each stream are also passed to the row batch, and the streams are
-  /// cleared from this context.
+  /// resources in each stream are also passed to the row batch. Callers which want to
+  /// clear the streams from the context should also call ClearStreams().
   //
   /// This must be called with 'done' set when the scanner is complete and no longer needs
-  /// any resources (e.g. tuple memory, io buffers) returned from the current
-  /// streams. After calling with 'done' set, this should be called again if new streams
-  /// are created via AddStream().
+  /// any resources (e.g. tuple memory, io buffers) returned from the current streams.
+  /// After calling with 'done' set, this should be called again if new streams are
+  /// created via AddStream().
   void ReleaseCompletedResources(RowBatch* batch, bool done);
 
+  /// Releases all the Stream objects in the vector 'streams_' and reduces the vector's
+  /// size to 0.
+  void ClearStreams();
+
   /// Add a stream to this ScannerContext for 'range'. Returns the added stream.
   /// The stream is created in the runtime state's object pool
   Stream* AddStream(DiskIoMgr::ScanRange* range);