You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/03 06:25:33 UTC
[10/11] impala git commit: Revert IMPALA-4835 and dependent changes
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index c669e65..abdde07 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,15 +41,14 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
- BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
- const vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool)
+ HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
+ const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
: state_(state),
scan_node_(scan_node),
- bp_client_(bp_client),
partition_desc_(partition_desc),
filter_ctxs_(filter_ctxs),
expr_results_pool_(expr_results_pool) {
+ AddStream(scan_range);
}
ScannerContext::~ScannerContext() {
@@ -67,20 +66,19 @@ void ScannerContext::ClearStreams() {
}
ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
- int64_t reservation, const HdfsFileDesc* file_desc)
+ const HdfsFileDesc* file_desc)
: parent_(parent),
scan_range_(scan_range),
file_desc_(file_desc),
- reservation_(reservation),
file_len_(file_desc->file_length),
next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
- streams_.emplace_back(new Stream(this, range, reservation,
- scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
+ streams_.emplace_back(new Stream(
+ this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
return streams_.back().get();
}
@@ -103,7 +101,6 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
DCHECK_EQ(0, io_buffer_bytes_left_);
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
if (io_buffer_ != nullptr) ReturnIoBuffer();
@@ -124,7 +121,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
int64_t read_past_buffer_size = 0;
- int64_t max_buffer_size = io_mgr->max_buffer_size();
+ int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
if (read_past_buffer_size <= 0) {
// Either no callback was set or the callback did not return an estimate. Use
@@ -136,7 +133,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
- read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
// We're reading past the scan range. Be careful not to read past the end of file.
DCHECK_GE(read_past_buffer_size, 0);
if (read_past_buffer_size == 0) {
@@ -147,23 +143,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
scan_range_->disk_id(), false, BufferOpts::Uncached());
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr->StartScanRange(
- parent_->scan_node_->reader_context(), range, &needs_buffers));
- if (needs_buffers) {
- // Allocate fresh buffers. The buffers for 'scan_range_' should be released now
- // since we hit EOS.
- if (reservation_ < io_mgr->min_buffer_size()) {
- return Status(Substitute("Could not read past end of scan range in file '$0'. "
- "Reservation provided $1 was < the minimum I/O buffer size",
- reservation_, io_mgr->min_buffer_size()));
- }
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- parent_->scan_node_->reader_context(), parent_->bp_client_, range,
- reservation_));
- }
- RETURN_IF_ERROR(range->GetNext(&io_buffer_));
- DCHECK(io_buffer_->eosr());
+ RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
+ parent_->scan_node_->reader_context(), range, &io_buffer_));
}
DCHECK(io_buffer_ != nullptr);
@@ -343,8 +324,7 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
void ScannerContext::Stream::ReturnIoBuffer() {
DCHECK(io_buffer_ != nullptr);
- ScanRange* range = io_buffer_->scan_range();
- range->ReturnBuffer(move(io_buffer_));
+ ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
io_buffer_pos_ = nullptr;
io_buffer_bytes_left_ = 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index 6292486..a131d3f 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,7 +27,6 @@
#include "common/compiler-util.h"
#include "common/status.h"
#include "exec/filter-context.h"
-#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/request-ranges.h"
namespace impala {
@@ -85,12 +84,10 @@ class TupleRow;
class ScannerContext {
public:
/// Create a scanner context with the parent scan_node (where materialized row batches
- /// get pushed to) and the scan range to process. Buffers are allocated using
- /// 'bp_client'.
- ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
- BufferPool::ClientHandle* bp_client,
- HdfsPartitionDescriptor* partition_desc,
- const std::vector<FilterContext>& filter_ctxs,
+ /// get pushed to) and the scan range to process.
+ /// This context starts with 1 stream.
+ ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
+ io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
/// Destructor verifies that all stream objects have been released.
~ScannerContext();
@@ -153,7 +150,6 @@ class ScannerContext {
const char* filename() { return scan_range_->file(); }
const io::ScanRange* scan_range() { return scan_range_; }
const HdfsFileDesc* file_desc() { return file_desc_; }
- int64_t reservation() const { return reservation_; }
/// Returns the buffer's current offset in the file.
int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
@@ -215,15 +211,9 @@ class ScannerContext {
private:
friend class ScannerContext;
- ScannerContext* const parent_;
- io::ScanRange* const scan_range_;
- const HdfsFileDesc* const file_desc_;
-
- /// Reservation given to this stream for allocating I/O buffers. The reservation is
- /// shared with 'scan_range_', so the context must be careful not to use this until
- /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
- /// buffer size to allow reading past the end of 'scan_range_'.
- const int64_t reservation_;
+ ScannerContext* parent_;
+ io::ScanRange* scan_range_;
+ const HdfsFileDesc* file_desc_;
/// Total number of bytes returned from GetBytes()
int64_t total_bytes_returned_ = 0;
@@ -282,8 +272,7 @@ class ScannerContext {
/// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
- /// Private constructor. See AddStream() for public API.
- Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
+ Stream(ScannerContext* parent, io::ScanRange* scan_range,
const HdfsFileDesc* file_desc);
/// GetBytes helper to handle the slow path.
@@ -366,37 +355,24 @@ class ScannerContext {
/// size to 0.
void ClearStreams();
- /// Add a stream to this ScannerContext for 'range'. 'range' must already have any
- /// buffers that it needs allocated. 'reservation' is the amount of reservation that
- /// is given to this stream for allocating I/O buffers. The reservation is shared with
- /// 'range', so the context must be careful not to use this until all of 'range's
- /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
- /// past the end of 'range'.
- ///
- /// Returns the added stream. The returned stream is owned by this context.
- Stream* AddStream(io::ScanRange* range, int64_t reservation);
+ /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
+ /// context.
+ Stream* AddStream(io::ScanRange* range);
/// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
/// multi-threaded and is done (finished, cancelled or reached it's limit).
/// In all other cases returns false.
bool cancelled() const;
- BufferPool::ClientHandle* bp_client() const { return bp_client_; }
- HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
+ HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
MemPool* expr_results_pool() const { return expr_results_pool_; }
private:
friend class Stream;
- RuntimeState* const state_;
- HdfsScanNodeBase* const scan_node_;
-
- /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
- /// threads in the multi-threaded scan node, so those threads must take care to only
- /// call thread-safe BufferPool methods with this client.
- BufferPool::ClientHandle* const bp_client_;
-
- HdfsPartitionDescriptor* const partition_desc_;
+ RuntimeState* state_;
+ HdfsScanNodeBase* scan_node_;
+ HdfsPartitionDescriptor* partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.
std::vector<std::unique_ptr<Stream>> streams_;
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index d14da63..285aacb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -37,7 +37,6 @@
namespace impala {
-class MemTracker;
class ReservationTracker;
class RuntimeProfile;
class SystemAllocator;
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index e441402..c46c5ea 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
TEST_F(ReservationTrackerTest, ReservationUtil) {
const int64_t MEG = 1024 * 1024;
const int64_t GIG = 1024 * 1024 * 1024;
- EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+ EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
- EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
+ EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG));
- EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
- EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+ EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+ EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG));
EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc
index a27ab9d..85718ab 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
// Most operators that accumulate memory use reservations, so the majority of memory
// should be allocated to buffer reservations, as a heuristic.
const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024;
int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
int64_t max_reservation = std::min<int64_t>(
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 73961de..8a51d88 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
#include "runtime/data-stream-mgr.h"
+#include "runtime/io/disk-io-mgr.h"
#include "runtime/hbase-table-factory.h"
#include "runtime/hdfs-fs-cache.h"
-#include "runtime/io/disk-io-mgr.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
@@ -350,7 +350,10 @@ Status ExecEnv::Init() {
LOG(INFO) << "Buffer pool limit: "
<< PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
- RETURN_IF_ERROR(disk_io_mgr_->Init());
+ RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
+
+ mem_tracker_->AddGcFunction(
+ [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
// Start services in order to ensure that dependencies between them are met
if (enable_webserver_) {
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 2d32487..3fc3895 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -35,25 +35,9 @@
#include "util/filesystem-util.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
-#include "util/runtime-profile-counters.h"
/// This file contains internal structures shared between submodules of the IoMgr. Users
/// of the IoMgr do not need to include this file.
-
-// Macros to work around counters sometimes not being provided.
-// TODO: fix things so that counters are always non-NULL.
-#define COUNTER_ADD_IF_NOT_NULL(c, v) \
- do { \
- ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
- if (__ctr__ != nullptr) __ctr__->Add(v); \
- } while (false);
-
-#define COUNTER_BITOR_IF_NOT_NULL(c, v) \
- do { \
- ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
- if (__ctr__ != nullptr) __ctr__->BitOr(v); \
- } while (false);
-
namespace impala {
namespace io {
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 2ec1d09..45b36ed 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,13 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#include <gflags/gflags.h>
-
-#include "common/init.h"
#include "runtime/io/disk-io-mgr-stress.h"
-#include "common/init.h"
-#include "runtime/test-env.h"
-#include "service/fe-support.h"
+#include "util/cpu-info.h"
#include "util/string-parser.h"
#include "common/names.h"
@@ -33,32 +28,34 @@ using namespace impala::io;
// can be passed to control how long to run this test (0 for forever).
// TODO: make these configurable once we decide how to run BE tests with args
-constexpr int DEFAULT_DURATION_SEC = 1;
+const int DEFAULT_DURATION_SEC = 1;
const int NUM_DISKS = 5;
const int NUM_THREADS_PER_DISK = 5;
const int NUM_CLIENTS = 10;
const bool TEST_CANCELLATION = true;
-const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
-
-DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
- "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely.");
int main(int argc, char** argv) {
- impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
- impala::InitFeSupport();
-
- if (FLAGS_duration_sec != 0) {
- printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
+ google::InitGoogleLogging(argv[0]);
+ CpuInfo::Init();
+ OsInfo::Init();
+ impala::InitThreading();
+ int duration_sec = DEFAULT_DURATION_SEC;
+
+ if (argc == 2) {
+ StringParser::ParseResult status;
+ duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
+ if (status != StringParser::PARSE_SUCCESS) {
+ printf("Invalid arg: %s\n", argv[1]);
+ return 1;
+ }
+ }
+ if (duration_sec != 0) {
+ printf("Running stress test for %d seconds.\n", duration_sec);
} else {
printf("Running stress test indefinitely.\n");
}
-
- TestEnv test_env;
- // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
- test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY);
- Status status = test_env.Init();
- CHECK(status.ok()) << status.GetDetail();
DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
- test.Run(FLAGS_duration_sec);
+ test.Run(duration_sec);
+
return 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 3fd33de..8815357 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,8 +19,6 @@
#include "runtime/io/disk-io-mgr-stress.h"
-#include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/exec-env.h"
#include "runtime/io/request-context.h"
#include "util/time.h"
@@ -29,20 +27,18 @@
using namespace impala;
using namespace impala::io;
-constexpr float DiskIoMgrStress::ABORT_CHANCE;
-const int DiskIoMgrStress::MIN_READ_LEN;
-const int DiskIoMgrStress::MAX_READ_LEN;
+static const float ABORT_CHANCE = .10f;
+static const int MIN_READ_LEN = 1;
+static const int MAX_READ_LEN = 20;
-const int DiskIoMgrStress::MIN_FILE_LEN;
-const int DiskIoMgrStress::MAX_FILE_LEN;
+static const int MIN_FILE_LEN = 10;
+static const int MAX_FILE_LEN = 1024;
// Make sure this is between MIN/MAX FILE_LEN to test more cases
-const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
-const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
+static const int MIN_READ_BUFFER_SIZE = 64;
+static const int MAX_READ_BUFFER_SIZE = 128;
-const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
-
-const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
+static const int CANCEL_READER_PERIOD_MS = 20; // in ms
static void CreateTempFile(const char* filename, const char* data) {
FILE* file = fopen(filename, "w");
@@ -51,7 +47,7 @@ static void CreateTempFile(const char* filename, const char* data) {
fclose(file);
}
-string DiskIoMgrStress::GenerateRandomData() {
+string GenerateRandomData() {
int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
stringstream ss;
for (int i = 0; i < rand_len; ++i) {
@@ -63,8 +59,6 @@ string DiskIoMgrStress::GenerateRandomData() {
struct DiskIoMgrStress::Client {
boost::mutex lock;
- /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
- ObjectPool obj_pool;
unique_ptr<RequestContext> reader;
int file_idx;
vector<ScanRange*> scan_ranges;
@@ -83,7 +77,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
- Status status = io_mgr_->Init();
+ Status status = io_mgr_->Init(&mem_tracker_);
CHECK(status.ok());
// Initialize some data files. It doesn't really matter how many there are.
@@ -98,7 +92,6 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
clients_ = new Client[num_clients_];
client_mem_trackers_.resize(num_clients_);
- buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
for (int i = 0; i < num_clients_; ++i) {
NewClient(i);
}
@@ -117,16 +110,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
while (!eos) {
ScanRange* range;
- bool needs_buffers;
- Status status =
- io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, &needs_buffers);
+ Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
CHECK(status.ok() || status.IsCancelled());
if (range == NULL) break;
- if (needs_buffers) {
- status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
- &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
- CHECK(status.ok()) << status.GetDetail();
- }
while (true) {
unique_ptr<BufferDescriptor> buffer;
@@ -151,7 +137,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Copy the bytes from this read into the result buffer.
memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
- range->ReturnBuffer(move(buffer));
+ io_mgr_->ReturnBuffer(move(buffer));
bytes_read += len;
CHECK_GE(bytes_read, 0);
@@ -173,7 +159,6 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Unregister the old client and get a new one
unique_lock<mutex> lock(client->lock);
io_mgr_->UnregisterContext(client->reader.get());
- client->reader.reset();
NewClient(client_id);
}
@@ -185,9 +170,11 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Cancel a random reader
void DiskIoMgrStress::CancelRandomReader() {
if (!includes_cancellation_) return;
- Client* rand_client = &clients_[rand() % num_clients_];
- unique_lock<mutex> lock(rand_client->lock);
- rand_client->reader->Cancel();
+
+ int rand_client = rand() % num_clients_;
+
+ unique_lock<mutex> lock(clients_[rand_client].lock);
+ io_mgr_->CancelContext(clients_[rand_client].reader.get());
}
void DiskIoMgrStress::Run(int sec) {
@@ -212,18 +199,10 @@ void DiskIoMgrStress::Run(int sec) {
for (int i = 0; i < num_clients_; ++i) {
unique_lock<mutex> lock(clients_[i].lock);
- if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
+ if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
}
- readers_.join_all();
- for (int i = 0; i < num_clients_; ++i) {
- if (clients_[i].reader != nullptr) {
- io_mgr_->UnregisterContext(clients_[i].reader.get());
- }
- ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
- client_mem_trackers_[i]->Close();
- }
- mem_tracker_.Close();
+ readers_.join_all();
}
// Initialize a client to read one of the files at random. The scan ranges are
@@ -244,41 +223,25 @@ void DiskIoMgrStress::NewClient(int i) {
}
}
- // Clean up leftover state from the previous client (if any).
+ for (int i = 0; i < client.scan_ranges.size(); ++i) {
+ delete client.scan_ranges[i];
+ }
client.scan_ranges.clear();
- ExecEnv* exec_env = ExecEnv::GetInstance();
- exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
- if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
- client.obj_pool.Clear();
int assigned_len = 0;
while (assigned_len < file_len) {
int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
range_len = min(range_len, file_len - assigned_len);
- ScanRange* range = client.obj_pool.Add(new ScanRange);
+ ScanRange* range = new ScanRange();
range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
0, false, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
- string client_name = Substitute("Client $0", i);
- client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
- Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
- exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
- numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
- &buffer_pool_clients_[i]);
- CHECK(status.ok());
- // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
- // progress.
- CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
- MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
- << buffer_pool_clients_[i].DebugString() << "\n"
- << exec_env->buffer_pool()->DebugString() << "\n"
- << exec_env->buffer_reservation()->DebugString();
-
- client.reader = io_mgr_->RegisterContext();
- status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+ client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
+ client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
+ Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
CHECK(status.ok());
}
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index 574b58c..b872694 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,11 +22,8 @@
#include <memory>
#include <vector>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
-#include "common/object-pool.h"
-#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
@@ -46,29 +43,15 @@ class DiskIoMgrStress {
/// Run the test for 'sec'. If 0, run forever
void Run(int sec);
- static constexpr float ABORT_CHANCE = .10f;
- static const int MIN_READ_LEN = 1;
- static const int MAX_READ_LEN = 20;
-
- static const int MIN_FILE_LEN = 10;
- static const int MAX_FILE_LEN = 1024;
-
- // Make sure this is between MIN/MAX FILE_LEN to test more cases
- static const int MIN_READ_BUFFER_SIZE = 64;
- static const int MAX_READ_BUFFER_SIZE = 128;
-
- // Maximum bytes to allocate per scan range.
- static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
-
- static const int CANCEL_READER_PERIOD_MS = 20;
private:
struct Client;
struct File {
std::string filename;
- std::string data; // the data in the file, used to validate
+ std::string data; // the data in the file, used to validate
};
+
/// Files used for testing. These are created at startup and recycled
/// during the test
std::vector<File> files_;
@@ -89,9 +72,6 @@ class DiskIoMgrStress {
/// Client MemTrackers, one per client.
std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
- /// Buffer pool clients, one per client.
- std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
-
/// If true, tests cancelling readers
bool includes_cancellation_;
@@ -108,8 +88,6 @@ class DiskIoMgrStress {
/// Possibly cancels a random reader.
void CancelRandomReader();
-
- static std::string GenerateRandomData();
};
}
}