You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2018/03/03 22:33:36 UTC

[8/9] impala git commit: Revert IMPALA-4835 and dependent changes

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index c669e65..abdde07 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,15 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
 const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
 
 ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-    BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
-    const vector<FilterContext>& filter_ctxs,
-    MemPool* expr_results_pool)
+    HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
+    const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
   : state_(state),
     scan_node_(scan_node),
-    bp_client_(bp_client),
     partition_desc_(partition_desc),
     filter_ctxs_(filter_ctxs),
     expr_results_pool_(expr_results_pool) {
+  AddStream(scan_range);
 }
 
 ScannerContext::~ScannerContext() {
@@ -67,20 +66,19 @@ void ScannerContext::ClearStreams() {
 }
 
 ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
-    int64_t reservation, const HdfsFileDesc* file_desc)
+    const HdfsFileDesc* file_desc)
   : parent_(parent),
     scan_range_(scan_range),
     file_desc_(file_desc),
-    reservation_(reservation),
     file_len_(file_desc->file_length),
     next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
     boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
     boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
 }
 
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
-  streams_.emplace_back(new Stream(this, range, reservation,
-      scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
+  streams_.emplace_back(new Stream(
+      this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
   return streams_.back().get();
 }
 
@@ -103,7 +101,6 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
 
 Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
   DCHECK_EQ(0, io_buffer_bytes_left_);
-  DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
   if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
   if (io_buffer_ != nullptr) ReturnIoBuffer();
 
@@ -124,7 +121,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
 
     int64_t read_past_buffer_size = 0;
-    int64_t max_buffer_size = io_mgr->max_buffer_size();
+    int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
     if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
     if (read_past_buffer_size <= 0) {
       // Either no callback was set or the callback did not return an estimate. Use
@@ -136,7 +133,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
     read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
     read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
-    read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
     // We're reading past the scan range. Be careful not to read past the end of file.
     DCHECK_GE(read_past_buffer_size, 0);
     if (read_past_buffer_size == 0) {
@@ -147,23 +143,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
         scan_range_->disk_id(), false, BufferOpts::Uncached());
-    bool needs_buffers;
-    RETURN_IF_ERROR(io_mgr->StartScanRange(
-        parent_->scan_node_->reader_context(), range, &needs_buffers));
-    if (needs_buffers) {
-      // Allocate fresh buffers. The buffers for 'scan_range_' should be released now
-      // since we hit EOS.
-      if (reservation_ < io_mgr->min_buffer_size()) {
-        return Status(Substitute("Could not read past end of scan range in file '$0'. "
-            "Reservation provided $1 was < the minimum I/O buffer size",
-            reservation_, io_mgr->min_buffer_size()));
-      }
-      RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
-          parent_->scan_node_->reader_context(), parent_->bp_client_, range,
-          reservation_));
-    }
-    RETURN_IF_ERROR(range->GetNext(&io_buffer_));
-    DCHECK(io_buffer_->eosr());
+    RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
+        parent_->scan_node_->reader_context(), range, &io_buffer_));
   }
 
   DCHECK(io_buffer_ != nullptr);
@@ -343,8 +324,7 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
 
 void ScannerContext::Stream::ReturnIoBuffer() {
   DCHECK(io_buffer_ != nullptr);
-  ScanRange* range = io_buffer_->scan_range();
-  range->ReturnBuffer(move(io_buffer_));
+  ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
   io_buffer_pos_ = nullptr;
   io_buffer_bytes_left_ = 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 6292486..a131d3f 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,7 +27,6 @@
 #include "common/compiler-util.h"
 #include "common/status.h"
 #include "exec/filter-context.h"
-#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/request-ranges.h"
 
 namespace impala {
@@ -85,12 +84,10 @@ class TupleRow;
 class ScannerContext {
  public:
   /// Create a scanner context with the parent scan_node (where materialized row batches
-  /// get pushed to) and the scan range to process. Buffers are allocated using
-  /// 'bp_client'.
-  ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
-      BufferPool::ClientHandle* bp_client,
-      HdfsPartitionDescriptor* partition_desc,
-      const std::vector<FilterContext>& filter_ctxs,
+  /// get pushed to) and the scan range to process.
+  /// This context starts with 1 stream.
+  ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
+      io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
       MemPool* expr_results_pool);
   /// Destructor verifies that all stream objects have been released.
   ~ScannerContext();
@@ -153,7 +150,6 @@ class ScannerContext {
     const char* filename() { return scan_range_->file(); }
     const io::ScanRange* scan_range() { return scan_range_; }
     const HdfsFileDesc* file_desc() { return file_desc_; }
-    int64_t reservation() const { return reservation_; }
 
     /// Returns the buffer's current offset in the file.
     int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
@@ -215,15 +211,9 @@ class ScannerContext {
 
    private:
     friend class ScannerContext;
-    ScannerContext* const parent_;
-    io::ScanRange* const scan_range_;
-    const HdfsFileDesc* const file_desc_;
-
-    /// Reservation given to this stream for allocating I/O buffers. The reservation is
-    /// shared with 'scan_range_', so the context must be careful not to use this until
-    /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
-    /// buffer size to allow reading past the end of 'scan_range_'.
-    const int64_t reservation_;
+    ScannerContext* parent_;
+    io::ScanRange* scan_range_;
+    const HdfsFileDesc* file_desc_;
 
     /// Total number of bytes returned from GetBytes()
     int64_t total_bytes_returned_ = 0;
@@ -282,8 +272,7 @@ class ScannerContext {
     /// output_buffer_bytes_left_ will be set to something else.
     static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
 
-    /// Private constructor. See AddStream() for public API.
-    Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
+    Stream(ScannerContext* parent, io::ScanRange* scan_range,
         const HdfsFileDesc* file_desc);
 
     /// GetBytes helper to handle the slow path.
@@ -366,37 +355,24 @@ class ScannerContext {
   /// size to 0.
   void ClearStreams();
 
-  /// Add a stream to this ScannerContext for 'range'. 'range' must already have any
-  /// buffers that it needs allocated. 'reservation' is the amount of reservation that
-  /// is given to this stream for allocating I/O buffers. The reservation is shared with
-  /// 'range', so the context must be careful not to use this until all of 'range's
-  /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
-  /// past the end of 'range'.
-  ///
-  /// Returns the added stream. The returned stream is owned by this context.
-  Stream* AddStream(io::ScanRange* range, int64_t reservation);
+  /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+  /// context.
+  Stream* AddStream(io::ScanRange* range);
 
   /// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
   /// multi-threaded and is done (finished, cancelled or reached it's limit).
   /// In all other cases returns false.
   bool cancelled() const;
 
-  BufferPool::ClientHandle* bp_client() const { return bp_client_; }
-  HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
+  HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
   MemPool* expr_results_pool() const { return expr_results_pool_; }
  private:
   friend class Stream;
 
-  RuntimeState* const state_;
-  HdfsScanNodeBase* const scan_node_;
-
-  /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
-  /// threads in the multi-threaded scan node, so those threads must take care to only
-  /// call thread-safe BufferPool methods with this client.
-  BufferPool::ClientHandle* const bp_client_;
-
-  HdfsPartitionDescriptor* const partition_desc_;
+  RuntimeState* state_;
+  HdfsScanNodeBase* scan_node_;
+  HdfsPartitionDescriptor* partition_desc_;
 
   /// Vector of streams. Non-columnar formats will always have one stream per context.
   std::vector<std::unique_ptr<Stream>> streams_;

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index d14da63..285aacb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -37,7 +37,6 @@
 
 namespace impala {
 
-class MemTracker;
 class ReservationTracker;
 class RuntimeProfile;
 class SystemAllocator;

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index e441402..c46c5ea 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
 TEST_F(ReservationTrackerTest, ReservationUtil) {
   const int64_t MEG = 1024 * 1024;
   const int64_t GIG = 1024 * 1024 * 1024;
-  EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+  EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
 
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
   EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
-  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
+  EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
   EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG));
 
-  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
-  EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+  EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
   EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG));
   EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc
index a27ab9d..85718ab 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
 // Most operators that accumulate memory use reservations, so the majority of memory
 // should be allocated to buffer reservations, as a heuristic.
 const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024;
 
 int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
   int64_t max_reservation = std::min<int64_t>(

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 2f19786..51eece5 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -367,7 +367,10 @@ Status ExecEnv::Init() {
   LOG(INFO) << "Buffer pool limit: "
             << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
 
-  RETURN_IF_ERROR(disk_io_mgr_->Init());
+  RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
+
+  mem_tracker_->AddGcFunction(
+      [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
 
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 2d32487..3fc3895 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -35,25 +35,9 @@
 #include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
-#include "util/runtime-profile-counters.h"
 
 /// This file contains internal structures shared between submodules of the IoMgr. Users
 /// of the IoMgr do not need to include this file.
-
-// Macros to work around counters sometimes not being provided.
-// TODO: fix things so that counters are always non-NULL.
-#define COUNTER_ADD_IF_NOT_NULL(c, v) \
-  do { \
-    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
-    if (__ctr__ != nullptr) __ctr__->Add(v); \
- } while (false);
-
-#define COUNTER_BITOR_IF_NOT_NULL(c, v) \
-  do { \
-    ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
-    if (__ctr__ != nullptr) __ctr__->BitOr(v); \
- } while (false);
-
 namespace impala {
 namespace io {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 2ec1d09..45b36ed 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,13 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gflags/gflags.h>
-
-#include "common/init.h"
 #include "runtime/io/disk-io-mgr-stress.h"
-#include "common/init.h"
-#include "runtime/test-env.h"
-#include "service/fe-support.h"
+#include "util/cpu-info.h"
 #include "util/string-parser.h"
 
 #include "common/names.h"
@@ -33,32 +28,34 @@ using namespace impala::io;
 // can be passed to control how long to run this test (0 for forever).
 
 // TODO: make these configurable once we decide how to run BE tests with args
-constexpr int DEFAULT_DURATION_SEC = 1;
+const int DEFAULT_DURATION_SEC = 1;
 const int NUM_DISKS = 5;
 const int NUM_THREADS_PER_DISK = 5;
 const int NUM_CLIENTS = 10;
 const bool TEST_CANCELLATION = true;
-const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
-
-DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
-    "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely.");
 
 int main(int argc, char** argv) {
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-
-  if (FLAGS_duration_sec != 0) {
-    printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
+  google::InitGoogleLogging(argv[0]);
+  CpuInfo::Init();
+  OsInfo::Init();
+  impala::InitThreading();
+  int duration_sec = DEFAULT_DURATION_SEC;
+
+  if (argc == 2) {
+    StringParser::ParseResult status;
+    duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
+    if (status != StringParser::PARSE_SUCCESS) {
+      printf("Invalid arg: %s\n", argv[1]);
+      return 1;
+    }
+  }
+  if (duration_sec != 0) {
+    printf("Running stress test for %d seconds.\n", duration_sec);
   } else {
     printf("Running stress test indefinitely.\n");
   }
-
-  TestEnv test_env;
-  // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
-  test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY);
-  Status status = test_env.Init();
-  CHECK(status.ok()) << status.GetDetail();
   DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
-  test.Run(FLAGS_duration_sec);
+  test.Run(duration_sec);
+
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 3fd33de..8815357 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,8 +19,6 @@
 
 #include "runtime/io/disk-io-mgr-stress.h"
 
-#include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/exec-env.h"
 #include "runtime/io/request-context.h"
 #include "util/time.h"
 
@@ -29,20 +27,18 @@
 using namespace impala;
 using namespace impala::io;
 
-constexpr float DiskIoMgrStress::ABORT_CHANCE;
-const int DiskIoMgrStress::MIN_READ_LEN;
-const int DiskIoMgrStress::MAX_READ_LEN;
+static const float ABORT_CHANCE = .10f;
+static const int MIN_READ_LEN = 1;
+static const int MAX_READ_LEN = 20;
 
-const int DiskIoMgrStress::MIN_FILE_LEN;
-const int DiskIoMgrStress::MAX_FILE_LEN;
+static const int MIN_FILE_LEN = 10;
+static const int MAX_FILE_LEN = 1024;
 
 // Make sure this is between MIN/MAX FILE_LEN to test more cases
-const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
-const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
+static const int MIN_READ_BUFFER_SIZE = 64;
+static const int MAX_READ_BUFFER_SIZE = 128;
 
-const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
-
-const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
+static const int CANCEL_READER_PERIOD_MS = 20;  // in ms
 
 static void CreateTempFile(const char* filename, const char* data) {
   FILE* file = fopen(filename, "w");
@@ -51,7 +47,7 @@ static void CreateTempFile(const char* filename, const char* data) {
   fclose(file);
 }
 
-string DiskIoMgrStress::GenerateRandomData() {
+string GenerateRandomData() {
   int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
   stringstream ss;
   for (int i = 0; i < rand_len; ++i) {
@@ -63,8 +59,6 @@ string DiskIoMgrStress::GenerateRandomData() {
 
 struct DiskIoMgrStress::Client {
   boost::mutex lock;
-  /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
-  ObjectPool obj_pool;
   unique_ptr<RequestContext> reader;
   int file_idx;
   vector<ScanRange*> scan_ranges;
@@ -83,7 +77,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
       MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init();
+  Status status = io_mgr_->Init(&mem_tracker_);
   CHECK(status.ok());
 
   // Initialize some data files.  It doesn't really matter how many there are.
@@ -98,7 +92,6 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   clients_ = new Client[num_clients_];
   client_mem_trackers_.resize(num_clients_);
-  buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
   for (int i = 0; i < num_clients_; ++i) {
     NewClient(i);
   }
@@ -117,16 +110,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
     while (!eos) {
       ScanRange* range;
-      bool needs_buffers;
-      Status status =
-          io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, &needs_buffers);
+      Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
       CHECK(status.ok() || status.IsCancelled());
       if (range == NULL) break;
-      if (needs_buffers) {
-        status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
-            &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
-        CHECK(status.ok()) << status.GetDetail();
-      }
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;
@@ -151,7 +137,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        range->ReturnBuffer(move(buffer));
+        io_mgr_->ReturnBuffer(move(buffer));
         bytes_read += len;
 
         CHECK_GE(bytes_read, 0);
@@ -173,7 +159,6 @@ void DiskIoMgrStress::ClientThread(int client_id) {
     // Unregister the old client and get a new one
     unique_lock<mutex> lock(client->lock);
     io_mgr_->UnregisterContext(client->reader.get());
-    client->reader.reset();
     NewClient(client_id);
   }
 
@@ -185,9 +170,11 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 // Cancel a random reader
 void DiskIoMgrStress::CancelRandomReader() {
   if (!includes_cancellation_) return;
-  Client* rand_client = &clients_[rand() % num_clients_];
-  unique_lock<mutex> lock(rand_client->lock);
-  rand_client->reader->Cancel();
+
+  int rand_client = rand() % num_clients_;
+
+  unique_lock<mutex> lock(clients_[rand_client].lock);
+  io_mgr_->CancelContext(clients_[rand_client].reader.get());
 }
 
 void DiskIoMgrStress::Run(int sec) {
@@ -212,18 +199,10 @@ void DiskIoMgrStress::Run(int sec) {
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
+    if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
   }
-  readers_.join_all();
 
-  for (int i = 0; i < num_clients_; ++i) {
-    if (clients_[i].reader != nullptr) {
-      io_mgr_->UnregisterContext(clients_[i].reader.get());
-    }
-    ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
-    client_mem_trackers_[i]->Close();
-  }
-  mem_tracker_.Close();
+  readers_.join_all();
 }
 
 // Initialize a client to read one of the files at random.  The scan ranges are
@@ -244,41 +223,25 @@ void DiskIoMgrStress::NewClient(int i) {
     }
   }
 
-  // Clean up leftover state from the previous client (if any).
+  for (int i = 0; i < client.scan_ranges.size(); ++i) {
+    delete client.scan_ranges[i];
+  }
   client.scan_ranges.clear();
-  ExecEnv* exec_env = ExecEnv::GetInstance();
-  exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
-  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
-  client.obj_pool.Clear();
 
   int assigned_len = 0;
   while (assigned_len < file_len) {
     int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
     range_len = min(range_len, file_len - assigned_len);
 
-    ScanRange* range = client.obj_pool.Add(new ScanRange);
+    ScanRange* range = new ScanRange();
     range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
         0, false, BufferOpts::Uncached());
     client.scan_ranges.push_back(range);
     assigned_len += range_len;
   }
 
-  string client_name = Substitute("Client $0", i);
-  client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
-  Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
-      exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
-      numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
-      &buffer_pool_clients_[i]);
-  CHECK(status.ok());
-  // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
-  // progress.
-  CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
-      MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
-      << buffer_pool_clients_[i].DebugString() << "\n"
-      << exec_env->buffer_pool()->DebugString() << "\n"
-      << exec_env->buffer_reservation()->DebugString();
-
-  client.reader = io_mgr_->RegisterContext();
-  status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+  client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
+  client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
+  Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
   CHECK(status.ok());
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/e5689fb5/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 574b58c..b872694 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,11 +22,8 @@
 #include <memory>
 #include <vector>
 #include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
 #include <boost/thread/thread.hpp>
 
-#include "common/object-pool.h"
-#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
@@ -46,29 +43,15 @@ class DiskIoMgrStress {
   /// Run the test for 'sec'.  If 0, run forever
   void Run(int sec);
 
-  static constexpr float ABORT_CHANCE = .10f;
-  static const int MIN_READ_LEN = 1;
-  static const int MAX_READ_LEN = 20;
-
-  static const int MIN_FILE_LEN = 10;
-  static const int MAX_FILE_LEN = 1024;
-
-  // Make sure this is between MIN/MAX FILE_LEN to test more cases
-  static const int MIN_READ_BUFFER_SIZE = 64;
-  static const int MAX_READ_BUFFER_SIZE = 128;
-
-  // Maximum bytes to allocate per scan range.
-  static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
-
-  static const int CANCEL_READER_PERIOD_MS = 20;
  private:
   struct Client;
 
   struct File {
     std::string filename;
-    std::string data; // the data in the file, used to validate
+    std::string data;  // the data in the file, used to validate
   };
 
+
   /// Files used for testing.  These are created at startup and recycled
   /// during the test
   std::vector<File> files_;
@@ -89,9 +72,6 @@ class DiskIoMgrStress {
   /// Client MemTrackers, one per client.
   std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
 
-  /// Buffer pool clients, one per client.
-  std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
-
   /// If true, tests cancelling readers
   bool includes_cancellation_;
 
@@ -108,8 +88,6 @@ class DiskIoMgrStress {
 
   /// Possibly cancels a random reader.
   void CancelRandomReader();
-
-  static std::string GenerateRandomData();
 };
 }
 }