You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/03 15:28:33 UTC

[14/15] impala git commit: IMPALA-4835: switch I/O buffers to buffer pool

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 2e3ab20..28138bd 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -324,42 +324,29 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
       const SlotDescriptor* slot_desc)
     : ParquetColumnReader(parent, node, slot_desc),
-      data_(NULL),
-      data_end_(NULL),
-      def_levels_(true),
-      rep_levels_(false),
-      page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
-      num_buffered_values_(0),
-      num_values_read_(0),
-      metadata_(NULL),
-      stream_(NULL),
       data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
     DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
   }
 
   virtual ~BaseScalarColumnReader() { }
 
-  /// This is called once for each row group in the file.
-  Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
-    DCHECK(stream != NULL);
-    DCHECK(metadata != NULL);
-
-    num_buffered_values_ = 0;
-    data_ = NULL;
-    data_end_ = NULL;
-    stream_ = stream;
-    metadata_ = metadata;
-    num_values_read_ = 0;
-    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
-    // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
-    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-
-    if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
-      RETURN_IF_ERROR(Codec::CreateDecompressor(
-          NULL, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
-    }
-    ClearDictionaryDecoder();
+  /// Resets the reader for each row group in the file and creates the scan
+  /// range for the column, but does not start it. To start scanning,
+  /// set_io_reservation() must be called to assign reservation to this
+  /// column, followed by StartScan().
+  Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
+    int row_group_idx);
+
+  /// Starts the column scan range. The reader must be Reset() and have a
+  /// reservation assigned via set_io_reservation(). This must be called
+  /// before any of the column data can be read (including dictionary and
+  /// data pages). Returns an error status if there was an error starting the
+  /// scan or allocating buffers for it.
+  Status StartScan();
+
+  /// Helper to start scans for multiple columns at once.
+  static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
+    for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
     return Status::OK();
   }
 
@@ -374,22 +361,27 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     if (dict_decoder != nullptr) dict_decoder->Close();
   }
 
+  io::ScanRange* scan_range() const { return scan_range_; }
   int64_t total_len() const { return metadata_->total_compressed_size; }
   int col_idx() const { return node_.col_idx; }
   THdfsCompression::type codec() const {
     if (metadata_ == NULL) return THdfsCompression::NONE;
     return ConvertParquetToImpalaCodec(metadata_->codec);
   }
+  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
 
   /// Reads the next definition and repetition levels for this column. Initializes the
   /// next data page if necessary.
   virtual bool NextLevels() { return NextLevels<true>(); }
 
-  // Check the data stream to see if there is a dictionary page. If there is,
-  // use that page to initialize dict_decoder_ and advance the data stream
-  // past the dictionary page.
+  /// Check the data stream to see if there is a dictionary page. If there is,
+  /// use that page to initialize dict_decoder_ and advance the data stream
+  /// past the dictionary page.
   Status InitDictionary();
 
+  /// Convenience function to initialize multiple dictionaries.
+  static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
+
   // Returns the dictionary or NULL if the dictionary doesn't exist
   virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
 
@@ -415,33 +407,45 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   // fit in as few cache lines as possible.
 
   /// Pointer to start of next value in data page
-  uint8_t* data_;
+  uint8_t* data_ = nullptr;
 
   /// End of the data page.
-  const uint8_t* data_end_;
+  const uint8_t* data_end_ = nullptr;
 
   /// Decoder for definition levels.
-  ParquetLevelDecoder def_levels_;
+  ParquetLevelDecoder def_levels_{true};
 
   /// Decoder for repetition levels.
-  ParquetLevelDecoder rep_levels_;
+  ParquetLevelDecoder rep_levels_{false};
 
   /// Page encoding for values of the current data page. Cached here for perf. Set in
   /// InitDataPage().
-  parquet::Encoding::type page_encoding_;
+  parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
 
   /// Num values remaining in the current data page
-  int num_buffered_values_;
+  int num_buffered_values_ = 0;
 
   // Less frequently used members that are not accessed in inner loop should go below
   // here so they do not occupy precious cache line space.
 
   /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_;
+  int64_t num_values_read_ = 0;
+
+  /// Metadata for the column for the current row group.
+  const parquet::ColumnMetaData* metadata_ = nullptr;
 
-  const parquet::ColumnMetaData* metadata_;
   boost::scoped_ptr<Codec> decompressor_;
-  ScannerContext::Stream* stream_;
+
+  /// The scan range for the column's data. Initialized for each row group by Reset().
+  io::ScanRange* scan_range_ = nullptr;
+
+  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
+  ScannerContext::Stream* stream_ = nullptr;
+
+  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
+  /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
+  /// by Reset().
+  int64_t io_reservation_ = 0;
 
   /// Pool to allocate storage for data pages from - either decompression buffers for
   /// compressed data pages or copies of the data page with var-len data to attach to

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index abdde07..c669e65 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-    HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
-    const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
+    BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
+    const vector<FilterContext>& filter_ctxs,
+    MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
+    bp_client_(bp_client),
     partition_desc_(partition_desc),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
-  AddStream(scan_range);
 }
 
 ScannerContext::~ScannerContext() {
@@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() {
 }
 
 ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
-    const HdfsFileDesc* file_desc)
+    int64_t reservation, const HdfsFileDesc* file_desc)
   : parent_(parent),
     scan_range_(scan_range),
     file_desc_(file_desc),
+    reservation_(reservation),
     file_len_(file_desc->file_length),
     next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
     boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
-  streams_.emplace_back(new Stream(
-      this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
+  streams_.emplace_back(new Stream(this, range, reservation,
+      scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
   return streams_.back().get();
 }
 
@@ -101,6 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   DCHECK_EQ(0, io_buffer_bytes_left_);
+  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
   if (io_buffer_ != nullptr) ReturnIoBuffer();
 
@@ -121,7 +124,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
     int64_t read_past_buffer_size = 0;
-    int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
+    int64_t max_buffer_size = io_mgr->max_buffer_size();
     if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
     if (read_past_buffer_size <= 0) {
       // Either no callback was set or the callback did not return an estimate. Use
@@ -133,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
     read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
     read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
+    read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
     // We're reading past the scan range. Be careful not to read past the end of file.
     DCHECK_GE(read_past_buffer_size, 0);
     if (read_past_buffer_size == 0) {
@@ -143,8 +147,23 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
         scan_range_->disk_id(), false, BufferOpts::Uncached());
-    RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
-        parent_->scan_node_->reader_context(), range, &io_buffer_));
+    bool needs_buffers;
+    RETURN_IF_ERROR(io_mgr->StartScanRange(
+        parent_->scan_node_->reader_context(), range, &needs_buffers));
+    if (needs_buffers) {
+      // Allocate fresh buffers. The buffers for 'scan_range_' should be released now
+      // since we hit EOS.
+      if (reservation_ < io_mgr->min_buffer_size()) {
+        return Status(Substitute("Could not read past end of scan range in file '$0'. "
+            "Reservation provided $1 was < the minimum I/O buffer size",
+            reservation_, io_mgr->min_buffer_size()));
+      }
+      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+          parent_->scan_node_->reader_context(), parent_->bp_client_, range,
+          reservation_));
+    }
+    RETURN_IF_ERROR(range->GetNext(&io_buffer_));
+    DCHECK(io_buffer_->eosr());
   }
 
   DCHECK(io_buffer_ != nullptr);
@@ -324,7 +343,8 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
 
 void ScannerContext::Stream::ReturnIoBuffer() {
   DCHECK(io_buffer_ != nullptr);
-  ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
+  ScanRange* range = io_buffer_->scan_range();
+  range->ReturnBuffer(move(io_buffer_));
   io_buffer_pos_ = nullptr;
   io_buffer_bytes_left_ = 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index a131d3f..6292486 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,6 +27,7 @@
 #include "common/compiler-util.h"
 #include "common/status.h"
 #include "exec/filter-context.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/request-ranges.h"
 
 namespace impala {
@@ -84,10 +85,12 @@ class TupleRow;
 class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized row batches
-  /// get pushed to) and the scan range to process.
-  /// This context starts with 1 stream.
-  ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
-      io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
+  /// get pushed to) and the scan range to process. Buffers are allocated using
+  /// 'bp_client'.
+  ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
+      BufferPool::ClientHandle* bp_client,
+      HdfsPartitionDescriptor* partition_desc,
+      const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
@@ -150,6 +153,7 @@ class ScannerContext {
     const char* filename() { return scan_range_->file(); }
     const io::ScanRange* scan_range() { return scan_range_; }
     const HdfsFileDesc* file_desc() { return file_desc_; }
+    int64_t reservation() const { return reservation_; }
 
     /// Returns the buffer's current offset in the file.
     int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
@@ -211,9 +215,15 @@ class ScannerContext {
 
    private:
     friend class ScannerContext;
-    ScannerContext* parent_;
-    io::ScanRange* scan_range_;
-    const HdfsFileDesc* file_desc_;
+    ScannerContext* const parent_;
+    io::ScanRange* const scan_range_;
+    const HdfsFileDesc* const file_desc_;
+
+    /// Reservation given to this stream for allocating I/O buffers. The reservation is
+    /// shared with 'scan_range_', so the context must be careful not to use this until
+    /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
+    /// buffer size to allow reading past the end of 'scan_range_'.
+    const int64_t reservation_;
 
     /// Total number of bytes returned from GetBytes()
     int64_t total_bytes_returned_ = 0;
@@ -272,7 +282,8 @@ class ScannerContext {
     /// output_buffer_bytes_left_ will be set to something else.
     static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
-    Stream(ScannerContext* parent, io::ScanRange* scan_range,
+    /// Private constructor. See AddStream() for public API.
+    Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
         const HdfsFileDesc* file_desc);
 
     /// GetBytes helper to handle the slow path.
@@ -355,24 +366,37 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
-  /// context.
-  Stream* AddStream(io::ScanRange* range);
+  /// Add a stream to this ScannerContext for 'range'. 'range' must already have any
+  /// buffers that it needs allocated. 'reservation' is the amount of reservation that
+  /// is given to this stream for allocating I/O buffers. The reservation is shared with
+  /// 'range', so the context must be careful not to use this until all of 'range's
+  /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
+  /// past the end of 'range'.
+  ///
+  /// Returns the added stream. The returned stream is owned by this context.
+  Stream* AddStream(io::ScanRange* range, int64_t reservation);
 
   /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
   /// multi-threaded and is done (finished, cancelled or reached it's limit).
   /// In all other cases returns false.
   bool cancelled() const;
 
-  HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
+  BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+  HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
   MemPool* expr_results_pool() const { return expr_results_pool_; }
  private:
   friend class Stream;
 
-  RuntimeState* state_;
-  HdfsScanNodeBase* scan_node_;
-  HdfsPartitionDescriptor* partition_desc_;
+  RuntimeState* const state_;
+  HdfsScanNodeBase* const scan_node_;
+
+  /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
+  /// threads in the multi-threaded scan node, so those threads must take care to only
+  /// call thread-safe BufferPool methods with this client.
+  BufferPool::ClientHandle* const bp_client_;
+
+  HdfsPartitionDescriptor* const partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.
   std::vector<std::unique_ptr<Stream>> streams_;

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 285aacb..d14da63 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -37,6 +37,7 @@
 
 namespace impala {
 
+class MemTracker;
 class ReservationTracker;
 class RuntimeProfile;
 class SystemAllocator;

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index c46c5ea..e441402 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
 TEST_F(ReservationTrackerTest, ReservationUtil) {
   const int64_t MEG = 1024 * 1024;
   const int64_t GIG = 1024 * 1024 * 1024;
-  EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+  EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
 
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
-  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
   EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG));
 
-  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
-  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
   EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG));
   EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc
index 85718ab..a27ab9d 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
 // Most operators that accumulate memory use reservations, so the majority of memory
 // should be allocated to buffer reservations, as a heuristic.
 const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024;
 
 int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
   int64_t max_reservation = std::min<int64_t>(

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 47bb0bd..1a38bc7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -366,10 +366,7 @@ Status ExecEnv::Init() {
   LOG(INFO) << "Buffer pool limit: "
             << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
 
-  RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
-
-  mem_tracker_->AddGcFunction(
-      [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+  RETURN_IF_ERROR(disk_io_mgr_->Init());
 
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index e6962ea..292530f 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -34,9 +34,25 @@
 #include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 
 /// This file contains internal structures shared between submodules of the IoMgr. Users
 /// of the IoMgr do not need to include this file.
+
+// Macros to work around counters sometimes not being provided.
+// TODO: fix things so that counters are always non-NULL.
+#define COUNTER_ADD_IF_NOT_NULL(c, v) \
+  do { \
+    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+    if (__ctr__ != nullptr) __ctr__->Add(v); \
+ } while (false);
+
+#define COUNTER_BITOR_IF_NOT_NULL(c, v) \
+  do { \
+    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+    if (__ctr__ != nullptr) __ctr__->BitOr(v); \
+ } while (false);
+
 namespace impala {
 namespace io {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 45b36ed..2ec1d09 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,8 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gflags/gflags.h>
+
+#include "common/init.h"
 #include "runtime/io/disk-io-mgr-stress.h"
-#include "util/cpu-info.h"
+#include "common/init.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
 #include "util/string-parser.h"
 
 #include "common/names.h"
@@ -28,34 +33,32 @@ using namespace impala::io;
 // can be passed to control how long to run this test (0 for forever).
 
 // TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
+constexpr int DEFAULT_DURATION_SEC = 1;
 const int NUM_DISKS = 5;
 const int NUM_THREADS_PER_DISK = 5;
 const int NUM_CLIENTS = 10;
 const bool TEST_CANCELLATION = true;
+const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
+
+DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
+    "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely.");
 
 int main(int argc, char** argv) {
-  google::InitGoogleLogging(argv[0]);
-  CpuInfo::Init();
-  OsInfo::Init();
-  impala::InitThreading();
-  int duration_sec = DEFAULT_DURATION_SEC;
-
-  if (argc == 2) {
-    StringParser::ParseResult status;
-    duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
-    if (status != StringParser::PARSE_SUCCESS) {
-      printf("Invalid arg: %s\n", argv[1]);
-      return 1;
-    }
-  }
-  if (duration_sec != 0) {
-    printf("Running stress test for %d seconds.\n", duration_sec);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+
+  if (FLAGS_duration_sec != 0) {
+    printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
   } else {
     printf("Running stress test indefinitely.\n");
   }
-  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
-  test.Run(duration_sec);
 
+  TestEnv test_env;
+  // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+  test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY);
+  Status status = test_env.Init();
+  CHECK(status.ok()) << status.GetDetail();
+  DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
+  test.Run(FLAGS_duration_sec);
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 8815357..3fd33de 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,6 +19,8 @@
 
 #include "runtime/io/disk-io-mgr-stress.h"
 
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
 #include "runtime/io/request-context.h"
 #include "util/time.h"
 
@@ -27,18 +29,20 @@
 using namespace impala;
 using namespace impala::io;
 
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
+constexpr float DiskIoMgrStress::ABORT_CHANCE;
+const int DiskIoMgrStress::MIN_READ_LEN;
+const int DiskIoMgrStress::MAX_READ_LEN;
 
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
+const int DiskIoMgrStress::MIN_FILE_LEN;
+const int DiskIoMgrStress::MAX_FILE_LEN;
 
 // Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
+const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
+const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
 
-static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
+const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
+
+const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
 
 static void CreateTempFile(const char* filename, const char* data) {
   FILE* file = fopen(filename, "w");
@@ -47,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) {
   fclose(file);
 }
 
-string GenerateRandomData() {
+string DiskIoMgrStress::GenerateRandomData() {
   int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
   stringstream ss;
   for (int i = 0; i < rand_len; ++i) {
@@ -59,6 +63,8 @@ string GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
+  /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
+  ObjectPool obj_pool;
   unique_ptr<RequestContext> reader;
   int file_idx;
   vector<ScanRange*> scan_ranges;
@@ -77,7 +83,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
       MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init(&mem_tracker_);
+  Status status = io_mgr_->Init();
   CHECK(status.ok());
 
   // Initialize some data files.  It doesn't really matter how many there are.
@@ -92,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   clients_ = new Client[num_clients_];
   client_mem_trackers_.resize(num_clients_);
+  buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
   for (int i = 0; i < num_clients_; ++i) {
     NewClient(i);
   }
@@ -110,9 +117,16 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       ScanRange* range;
-      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
+      bool needs_buffers;
+      Status status =
+          io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, &needs_buffers);
       CHECK(status.ok() || status.IsCancelled());
       if (range == NULL) break;
+      if (needs_buffers) {
+        status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
+            &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
+        CHECK(status.ok()) << status.GetDetail();
+      }
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;
@@ -137,7 +151,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        io_mgr_->ReturnBuffer(move(buffer));
+        range->ReturnBuffer(move(buffer));
         bytes_read += len;
 
         CHECK_GE(bytes_read, 0);
@@ -159,6 +173,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
     // Unregister the old client and get a new one
     unique_lock<mutex> lock(client->lock);
     io_mgr_->UnregisterContext(client->reader.get());
+    client->reader.reset();
     NewClient(client_id);
   }
 
@@ -170,11 +185,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 // Cancel a random reader
 void DiskIoMgrStress::CancelRandomReader() {
   if (!includes_cancellation_) return;
-
-  int rand_client = rand() % num_clients_;
-
-  unique_lock<mutex> lock(clients_[rand_client].lock);
-  io_mgr_->CancelContext(clients_[rand_client].reader.get());
+  Client* rand_client = &clients_[rand() % num_clients_];
+  unique_lock<mutex> lock(rand_client->lock);
+  rand_client->reader->Cancel();
 }
 
 void DiskIoMgrStress::Run(int sec) {
@@ -199,10 +212,18 @@ void DiskIoMgrStress::Run(int sec) {
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
+    if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
   }
-
   readers_.join_all();
+
+  for (int i = 0; i < num_clients_; ++i) {
+    if (clients_[i].reader != nullptr) {
+      io_mgr_->UnregisterContext(clients_[i].reader.get());
+    }
+    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+    client_mem_trackers_[i]->Close();
+  }
+  mem_tracker_.Close();
 }
 
 // Initialize a client to read one of the files at random.  The scan ranges are
@@ -223,25 +244,41 @@ void DiskIoMgrStress::NewClient(int i) {
     }
   }
 
-  for (int i = 0; i < client.scan_ranges.size(); ++i) {
-    delete client.scan_ranges[i];
-  }
+  // Clean up leftover state from the previous client (if any).
   client.scan_ranges.clear();
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
+  client.obj_pool.Clear();
 
   int assigned_len = 0;
   while (assigned_len < file_len) {
     int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
     range_len = min(range_len, file_len - assigned_len);
 
-    ScanRange* range = new ScanRange();
+    ScanRange* range = client.obj_pool.Add(new ScanRange);
     range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
         0, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
 
-  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
-  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
-  Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+  string client_name = Substitute("Client $0", i);
+  client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
+  Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
+      exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
+      numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
+      &buffer_pool_clients_[i]);
+  CHECK(status.ok());
+  // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
+  // progress.
+  CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
+      MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
+      << buffer_pool_clients_[i].DebugString() << "\n"
+      << exec_env->buffer_pool()->DebugString() << "\n"
+      << exec_env->buffer_reservation()->DebugString();
+
+  client.reader = io_mgr_->RegisterContext();
+  status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
   CHECK(status.ok());
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index b872694..574b58c 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,8 +22,11 @@
 #include <memory>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/thread.hpp>
 
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
@@ -43,15 +46,29 @@ class DiskIoMgrStress {
   /// Run the test for 'sec'.  If 0, run forever
   void Run(int sec);
 
+  static constexpr float ABORT_CHANCE = .10f;
+  static const int MIN_READ_LEN = 1;
+  static const int MAX_READ_LEN = 20;
+
+  static const int MIN_FILE_LEN = 10;
+  static const int MAX_FILE_LEN = 1024;
+
+  // Make sure this is between MIN/MAX FILE_LEN to test more cases
+  static const int MIN_READ_BUFFER_SIZE = 64;
+  static const int MAX_READ_BUFFER_SIZE = 128;
+
+  // Maximum bytes to allocate per scan range.
+  static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+
+  static const int CANCEL_READER_PERIOD_MS = 20;
  private:
   struct Client;
 
   struct File {
     std::string filename;
-    std::string data;  // the data in the file, used to validate
+    std::string data; // the data in the file, used to validate
   };
 
-
   /// Files used for testing.  These are created at startup and recycled
   /// during the test
   std::vector<File> files_;
@@ -72,6 +89,9 @@ class DiskIoMgrStress {
   /// Client MemTrackers, one per client.
   std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
 
+  /// Buffer pool clients, one per client.
+  std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
+
   /// If true, tests cancelling readers
   bool includes_cancellation_;
 
@@ -88,6 +108,8 @@ class DiskIoMgrStress {
 
   /// Possibly cancels a random reader.
   void CancelRandomReader();
+
+  static std::string GenerateRandomData();
 };
 }
 }