You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/26 02:41:53 UTC
[3/4] incubator-impala git commit: Refactor RuntimeState and ExecEnv
dependencies
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index f1e71b2..c01f34b 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -18,7 +18,7 @@
using namespace impala;
-void DiskIoMgr::RequestContext::Cancel(const Status& status) {
+void DiskIoRequestContext::Cancel(const Status& status) {
DCHECK(!status.ok());
// Callbacks are collected in this vector and invoked while no lock is held.
@@ -28,18 +28,18 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
DCHECK(Validate()) << endl << DebugString();
// Already being cancelled
- if (state_ == RequestContext::Cancelled) return;
+ if (state_ == DiskIoRequestContext::Cancelled) return;
DCHECK(status_.ok());
status_ = status;
// The reader will be put into a cancelled state until call cleanup is complete.
- state_ = RequestContext::Cancelled;
+ state_ = DiskIoRequestContext::Cancelled;
// Cancel all scan ranges for this reader. Each range could be one one of
// four queues.
for (int i = 0; i < disk_states_.size(); ++i) {
- RequestContext::PerDiskState& state = disk_states_[i];
+ DiskIoRequestContext::PerDiskState& state = disk_states_[i];
RequestRange* range = NULL;
while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
if (range->request_type() == RequestType::READ) {
@@ -74,7 +74,7 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
// Schedule reader on all disks. The disks will notice it is cancelled and do any
// required cleanup
for (int i = 0; i < disk_states_.size(); ++i) {
- RequestContext::PerDiskState& state = disk_states_[i];
+ DiskIoRequestContext::PerDiskState& state = disk_states_[i];
state.ScheduleContext(this, i);
}
}
@@ -88,10 +88,10 @@ void DiskIoMgr::RequestContext::Cancel(const Status& status) {
ready_to_start_ranges_cv_.notify_all();
}
-void DiskIoMgr::RequestContext::AddRequestRange(
+void DiskIoRequestContext::AddRequestRange(
DiskIoMgr::RequestRange* range, bool schedule_immediately) {
// DCHECK(lock_.is_locked()); // TODO: boost should have this API
- RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+ DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
if (state.done()) {
DCHECK_EQ(state.num_remaining_ranges(), 0);
state.set_done(false);
@@ -107,7 +107,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
state.unstarted_scan_ranges()->Enqueue(scan_range);
num_unstarted_scan_ranges_.Add(1);
}
- // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
+ // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will
// be set. If it's not NULL, this context will be scheduled when GetNextRange() is
// invoked.
schedule_context = state.next_scan_range_to_start() == NULL;
@@ -126,7 +126,7 @@ void DiskIoMgr::RequestContext::AddRequestRange(
++state.num_remaining_ranges();
}
-DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
+DiskIoRequestContext::DiskIoRequestContext(DiskIoMgr* parent, int num_disks)
: parent_(parent),
bytes_read_counter_(NULL),
read_timer_(NULL),
@@ -137,7 +137,7 @@ DiskIoMgr::RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
}
// Resets this object.
-void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
+void DiskIoRequestContext::Reset(MemTracker* tracker) {
DCHECK_EQ(state_, Inactive);
status_ = Status::OK();
@@ -173,13 +173,13 @@ void DiskIoMgr::RequestContext::Reset(MemTracker* tracker) {
}
// Dumps out request context information. Lock should be taken by caller
-string DiskIoMgr::RequestContext::DebugString() const {
+string DiskIoRequestContext::DebugString() const {
stringstream ss;
- ss << endl << " RequestContext: " << (void*)this << " (state=";
- if (state_ == RequestContext::Inactive) ss << "Inactive";
- if (state_ == RequestContext::Cancelled) ss << "Cancelled";
- if (state_ == RequestContext::Active) ss << "Active";
- if (state_ != RequestContext::Inactive) {
+ ss << endl << " DiskIoRequestContext: " << (void*)this << " (state=";
+ if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive";
+ if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled";
+ if (state_ == DiskIoRequestContext::Active) ss << "Active";
+ if (state_ != DiskIoRequestContext::Inactive) {
ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
<< " #ready_buffers=" << num_ready_buffers_.Load()
<< " #used_buffers=" << num_used_buffers_.Load()
@@ -203,9 +203,9 @@ string DiskIoMgr::RequestContext::DebugString() const {
return ss.str();
}
-bool DiskIoMgr::RequestContext::Validate() const {
- if (state_ == RequestContext::Inactive) {
- LOG(WARNING) << "state_ == RequestContext::Inactive";
+bool DiskIoRequestContext::Validate() const {
+ if (state_ == DiskIoRequestContext::Inactive) {
+ LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive";
return false;
}
@@ -234,7 +234,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
return false;
}
- if (state_ != RequestContext::Cancelled) {
+ if (state_ != DiskIoRequestContext::Cancelled) {
if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() >
state.num_remaining_ranges()) {
LOG(WARNING) << "disk_id=" << i
@@ -285,7 +285,7 @@ bool DiskIoMgr::RequestContext::Validate() const {
}
}
- if (state_ != RequestContext::Cancelled) {
+ if (state_ != DiskIoRequestContext::Cancelled) {
if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) {
LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges
<< " sum_in_states=" << num_unstarted_scan_ranges_.Load();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 01ea4b5..25399bc 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -121,7 +121,7 @@ Status DiskIoMgr::ScanRange::GetNext(BufferDescriptor** buffer) {
}
DCHECK(reader_->Validate()) << endl << reader_->DebugString();
- if (reader_->state_ == RequestContext::Cancelled) {
+ if (reader_->state_ == DiskIoRequestContext::Cancelled) {
reader_->blocked_ranges_.Remove(this);
Cancel(reader_->status_);
(*buffer)->Return();
@@ -230,7 +230,7 @@ void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64
mtime_ = mtime;
}
-void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
+void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
DCHECK(hdfs_file_ == NULL);
io_mgr_ = io_mgr;
reader_ = reader;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index af6ad27..0a8a628 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -52,7 +52,7 @@ string GenerateRandomData() {
struct DiskIoMgrStress::Client {
boost::mutex lock;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
int file_idx;
vector<DiskIoMgr::ScanRange*> scan_ranges;
int abort_at_byte;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index ee89f56..46149b5 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -40,7 +40,7 @@ namespace impala {
class DiskIoMgrTest : public testing::Test {
public:
void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
- DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader, int32_t* data,
+ DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
Status expected_status, const Status& status) {
if (expected_status.code() == TErrorCode::CANCELLED) {
EXPECT_TRUE(status.ok() || status.IsCancelled()) << "Error: " << status.GetDetail();
@@ -99,7 +99,7 @@ class DiskIoMgrTest : public testing::Test {
}
}
- static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+ static void ValidateSyncRead(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
DiskIoMgr::ScanRange* range, const char* expected, int expected_len = -1) {
DiskIoMgr::BufferDescriptor* buffer;
ASSERT_OK(io_mgr->Read(reader, range, &buffer));
@@ -134,7 +134,7 @@ class DiskIoMgrTest : public testing::Test {
// Continues pulling scan ranges from the io mgr until they are all done.
// Updates num_ranges_processed with the number of ranges seen by this thread.
- static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoMgr::RequestContext* reader,
+ static void ScanRangeThread(DiskIoMgr* io_mgr, DiskIoRequestContext* reader,
const char* expected_result, int expected_len, const Status& expected_status,
int max_ranges, AtomicInt32* num_ranges_processed) {
int num_ranges = 0;
@@ -185,14 +185,14 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
pool_.reset(new ObjectPool);
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init(&mem_tracker));
- DiskIoMgr::RequestContext* writer;
+ DiskIoRequestContext* writer;
io_mgr.RegisterContext(&writer, &mem_tracker);
for (int i = 0; i < num_ranges; ++i) {
int32_t* data = pool_->Add(new int32_t);
@@ -228,7 +228,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
string tmp_file = "/tmp/non-existent.txt";
DiskIoMgr io_mgr(1, 1, 1, 10);
ASSERT_OK(io_mgr.Init(&mem_tracker));
- DiskIoMgr::RequestContext* writer;
+ DiskIoRequestContext* writer;
ASSERT_OK(io_mgr.RegisterContext(&writer));
pool_.reset(new ObjectPool);
int32_t* data = pool_->Add(new int32_t);
@@ -238,7 +238,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
DiskIoMgr::WriteRange::WriteDoneCallback callback =
bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
- new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+ new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
*new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
@@ -255,7 +255,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
new_range = pool_->Add(new DiskIoMgr::WriteRange*);
callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
- new_range, (DiskIoMgr*)NULL, (DiskIoMgr::RequestContext*)NULL,
+ new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
data, Status(TErrorCode::RUNTIME_ERROR, "Test Failure"), _1);
*new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
@@ -291,14 +291,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(read_io_mgr->RegisterContext(&reader, &reader_mem_tracker));
for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
pool_.reset(new ObjectPool);
DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
ASSERT_OK(io_mgr.Init(&mem_tracker));
- DiskIoMgr::RequestContext* writer;
+ DiskIoRequestContext* writer;
io_mgr.RegisterContext(&writer, &mem_tracker);
Status validate_status = Status::OK();
for (int i = 0; i < num_ranges; ++i) {
@@ -362,7 +362,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
vector<DiskIoMgr::ScanRange*> ranges;
@@ -416,7 +416,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
vector<DiskIoMgr::ScanRange*> ranges_first_half;
@@ -489,7 +489,7 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
@@ -559,7 +559,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
vector<DiskIoMgr::ScanRange*> ranges;
@@ -624,7 +624,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
vector<DiskIoMgr::ScanRange*> ranges;
@@ -699,7 +699,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
ASSERT_OK(io_mgr.Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
DiskIoMgr::ScanRange* complete_range =
@@ -764,7 +764,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
stat(file_name.c_str(), &stat_val);
int64_t iters = 0;
- vector<DiskIoMgr::RequestContext*> contexts(num_contexts);
+ vector<DiskIoRequestContext*> contexts(num_contexts);
Status status;
for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
@@ -838,7 +838,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
vector<string> file_names;
vector<int64_t> mtimes;
vector<string> data;
- vector<DiskIoMgr::RequestContext*> readers;
+ vector<DiskIoRequestContext*> readers;
vector<char*> results;
file_names.resize(NUM_READERS);
@@ -1000,7 +1000,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
ASSERT_OK(io_mgr->Init(&mem_tracker));
MemTracker reader_mem_tracker;
- DiskIoMgr::RequestContext* reader;
+ DiskIoRequestContext* reader;
ASSERT_OK(io_mgr->RegisterContext(&reader, &reader_mem_tracker));
// We should not read past the end of file.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 66f0498..448424f 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -104,33 +104,34 @@ DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() {
hdfs_file_ = NULL;
}
-// This class provides a cache of RequestContext objects. RequestContexts are recycled.
-// This is good for locality as well as lock contention. The cache has the property that
-// regardless of how many clients get added/removed, the memory locations for
-// existing clients do not change (not the case with std::vector) minimizing the locks we
-// have to take across all readers.
+// This class provides a cache of DiskIoRequestContext objects. DiskIoRequestContexts
+// are recycled. This is good for locality as well as lock contention. The cache has
+// the property that regardless of how many clients get added/removed, the memory
+// locations for existing clients do not change (not the case with std::vector)
+// minimizing the locks we have to take across all readers.
// All functions on this object are thread safe
class DiskIoMgr::RequestContextCache {
public:
RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
// Returns a context to the cache. This object can now be reused.
- void ReturnContext(RequestContext* reader) {
- DCHECK(reader->state_ != RequestContext::Inactive);
- reader->state_ = RequestContext::Inactive;
+ void ReturnContext(DiskIoRequestContext* reader) {
+ DCHECK(reader->state_ != DiskIoRequestContext::Inactive);
+ reader->state_ = DiskIoRequestContext::Inactive;
lock_guard<mutex> l(lock_);
inactive_contexts_.push_back(reader);
}
- // Returns a new RequestContext object. Allocates a new object if necessary.
- RequestContext* GetNewContext() {
+ // Returns a new DiskIoRequestContext object. Allocates a new object if necessary.
+ DiskIoRequestContext* GetNewContext() {
lock_guard<mutex> l(lock_);
if (!inactive_contexts_.empty()) {
- RequestContext* reader = inactive_contexts_.front();
+ DiskIoRequestContext* reader = inactive_contexts_.front();
inactive_contexts_.pop_front();
return reader;
} else {
- RequestContext* reader = new RequestContext(io_mgr_, io_mgr_->num_total_disks());
+ DiskIoRequestContext* reader =
+ new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks());
all_contexts_.push_back(reader);
return reader;
}
@@ -138,7 +139,7 @@ class DiskIoMgr::RequestContextCache {
// This object has the same lifetime as the disk IoMgr.
~RequestContextCache() {
- for (list<RequestContext*>::iterator it = all_contexts_.begin();
+ for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
delete *it;
}
@@ -147,9 +148,9 @@ class DiskIoMgr::RequestContextCache {
// Validates that all readers are cleaned up and in the inactive state. No locks
// are taken since this is only called from the disk IoMgr destructor.
bool ValidateAllInactive() {
- for (list<RequestContext*>::iterator it = all_contexts_.begin();
+ for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
- if ((*it)->state_ != RequestContext::Inactive) {
+ if ((*it)->state_ != DiskIoRequestContext::Inactive) {
return false;
}
}
@@ -166,16 +167,16 @@ class DiskIoMgr::RequestContextCache {
mutex lock_;
// List of all request contexts created. Used for debugging
- list<RequestContext*> all_contexts_;
+ list<DiskIoRequestContext*> all_contexts_;
// List of inactive readers. These objects can be used for a new reader.
- list<RequestContext*> inactive_contexts_;
+ list<DiskIoRequestContext*> inactive_contexts_;
};
string DiskIoMgr::RequestContextCache::DebugString() {
lock_guard<mutex> l(lock_);
stringstream ss;
- for (list<RequestContext*>::iterator it = all_contexts_.begin();
+ for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
unique_lock<mutex> lock((*it)->lock_);
ss << (*it)->DebugString() << endl;
@@ -193,7 +194,7 @@ string DiskIoMgr::DebugString() {
ss << " " << (void*) disk_queues_[i] << ":" ;
if (!disk_queues_[i]->request_contexts.empty()) {
ss << " Readers: ";
- for (RequestContext* req_context: disk_queues_[i]->request_contexts) {
+ for (DiskIoRequestContext* req_context: disk_queues_[i]->request_contexts) {
ss << (void*)req_context;
}
}
@@ -206,7 +207,7 @@ DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) :
io_mgr_(io_mgr), reader_(NULL), buffer_(NULL) {
}
-void DiskIoMgr::BufferDescriptor::Reset(RequestContext* reader,
+void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader,
ScanRange* range, char* buffer, int64_t buffer_len) {
DCHECK(io_mgr_ != NULL);
DCHECK(buffer_ == NULL);
@@ -314,7 +315,7 @@ DiskIoMgr::~DiskIoMgr() {
for (int i = 0; i < disk_queues_.size(); ++i) {
if (disk_queues_[i] == NULL) continue;
int disk_id = disk_queues_[i]->disk_id;
- for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
+ for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
it != disk_queues_[i]->request_contexts.end(); ++it) {
DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
DCHECK((*it)->disk_states_[disk_id].done());
@@ -384,7 +385,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
return Status::OK();
}
-Status DiskIoMgr::RegisterContext(RequestContext** request_context,
+Status DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
MemTracker* mem_tracker) {
DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
*request_context = request_context_cache_->GetNewContext();
@@ -392,7 +393,7 @@ Status DiskIoMgr::RegisterContext(RequestContext** request_context,
return Status::OK();
}
-void DiskIoMgr::UnregisterContext(RequestContext* reader) {
+void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
// Blocking cancel (waiting for disks completion).
CancelContext(reader, true);
@@ -425,7 +426,7 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
// outstanding reference to the context decrements the number of disk queues the context
// is on.
// If wait_for_disks_completion is true, wait for the number of active disks to become 0.
-void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_completion) {
+void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) {
context->Cancel(Status::CANCELLED);
if (wait_for_disks_completion) {
@@ -437,50 +438,50 @@ void DiskIoMgr::CancelContext(RequestContext* context, bool wait_for_disks_compl
}
}
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
r->read_timer_ = c;
}
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) {
+void DiskIoMgr::set_bytes_read_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
r->bytes_read_counter_ = c;
}
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
+void DiskIoMgr::set_active_read_thread_counter(DiskIoRequestContext* r,
RuntimeProfile::Counter* c) {
r->active_read_thread_counter_ = c;
}
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
+void DiskIoMgr::set_disks_access_bitmap(DiskIoRequestContext* r,
RuntimeProfile::Counter* c) {
r->disks_accessed_bitmap_ = c;
}
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
+int64_t DiskIoMgr::queue_size(DiskIoRequestContext* reader) const {
return reader->num_ready_buffers_.Load();
}
-Status DiskIoMgr::context_status(RequestContext* context) const {
+Status DiskIoMgr::context_status(DiskIoRequestContext* context) const {
unique_lock<mutex> lock(context->lock_);
return context->status_;
}
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_local(DiskIoRequestContext* reader) const {
return reader->bytes_read_local_.Load();
}
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_short_circuit(DiskIoRequestContext* reader) const {
return reader->bytes_read_short_circuit_.Load();
}
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
+int64_t DiskIoMgr::bytes_read_dn_cache(DiskIoRequestContext* reader) const {
return reader->bytes_read_dn_cache_.Load();
}
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
+int DiskIoMgr::num_remote_ranges(DiskIoRequestContext* reader) const {
return reader->num_remote_ranges_.Load();
}
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
+int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const {
return reader->unexpected_remote_bytes_.Load();
}
@@ -499,7 +500,7 @@ Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
return Status::OK();
}
-Status DiskIoMgr::AddScanRanges(RequestContext* reader,
+Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
const vector<ScanRange*>& ranges, bool schedule_immediately) {
if (ranges.empty()) return Status::OK();
@@ -513,7 +514,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK(reader->Validate()) << endl << reader->DebugString();
- if (reader->state_ == RequestContext::Cancelled) {
+ if (reader->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(!reader->status_.ok());
return reader->status_;
}
@@ -545,7 +546,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
// This function returns the next scan range the reader should work on, checking
// for eos and error cases. If there isn't already a cached scan range or a scan
// range prepared by the disk threads, the caller waits on the disk threads.
-Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
+Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) {
DCHECK(reader != NULL);
DCHECK(range != NULL);
*range = NULL;
@@ -555,7 +556,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
DCHECK(reader->Validate()) << endl << reader->DebugString();
while (true) {
- if (reader->state_ == RequestContext::Cancelled) {
+ if (reader->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(!reader->status_.ok());
status = reader->status_;
break;
@@ -599,7 +600,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
return status;
}
-Status DiskIoMgr::Read(RequestContext* reader,
+Status DiskIoMgr::Read(DiskIoRequestContext* reader,
ScanRange* range, BufferDescriptor** buffer) {
DCHECK(range != NULL);
DCHECK(buffer != NULL);
@@ -623,7 +624,7 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
DCHECK(buffer_desc != NULL);
if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
- RequestContext* reader = buffer_desc->reader_;
+ DiskIoRequestContext* reader = buffer_desc->reader_;
if (buffer_desc->buffer_ != NULL) {
if (buffer_desc->scan_range_->cached_buffer_ == NULL) {
// Not a cached buffer. Return the io buffer and update mem tracking.
@@ -655,7 +656,7 @@ void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
}
DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(
- RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
+ DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size) {
BufferDescriptor* buffer_desc;
{
unique_lock<mutex> lock(free_buffers_lock_);
@@ -771,18 +772,18 @@ void DiskIoMgr::ReturnFreeBuffer(char* buffer, int64_t buffer_size) {
// b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
// immediately if there are no preceding scan ranges in in_flight_ranges_
// It blocks until work is available or the thread is shut down.
-// Work is available if there is a RequestContext with
+// Work is available if there is a DiskIoRequestContext with
// - A ScanRange with a buffer available, or
// - A WriteRange in unstarted_write_ranges_.
bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
- RequestContext** request_context) {
+ DiskIoRequestContext** request_context) {
int disk_id = disk_queue->disk_id;
*range = NULL;
// This loops returns either with work to do or when the disk IoMgr shuts down.
while (true) {
*request_context = NULL;
- RequestContext::PerDiskState* request_disk_state = NULL;
+ DiskIoRequestContext::PerDiskState* request_disk_state = NULL;
{
unique_lock<mutex> disk_lock(disk_queue->lock);
@@ -827,12 +828,12 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
<< (*request_context)->DebugString();
// Check if reader has been cancelled
- if ((*request_context)->state_ == RequestContext::Cancelled) {
+ if ((*request_context)->state_ == DiskIoRequestContext::Cancelled) {
request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
continue;
}
- DCHECK_EQ((*request_context)->state_, RequestContext::Active)
+ DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active)
<< (*request_context)->DebugString();
if (request_disk_state->next_scan_range_to_start() == NULL &&
@@ -889,7 +890,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
return false;
}
-void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+void DiskIoMgr::HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
const Status& write_status) {
// Execute the callback before decrementing the thread count. Otherwise CancelContext()
// that waits for the disk ref count to be 0 will return, creating a race, e.g.
@@ -900,8 +901,8 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
{
unique_lock<mutex> writer_lock(writer->lock_);
DCHECK(writer->Validate()) << endl << writer->DebugString();
- RequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
- if (writer->state_ == RequestContext::Cancelled) {
+ DiskIoRequestContext::PerDiskState& state = writer->disk_states_[write_range->disk_id_];
+ if (writer->state_ == DiskIoRequestContext::Cancelled) {
state.DecrementRequestThreadAndCheckDone(writer);
} else {
state.DecrementRequestThread();
@@ -910,16 +911,16 @@ void DiskIoMgr::HandleWriteFinished(RequestContext* writer, WriteRange* write_ra
}
}
-void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
BufferDescriptor* buffer) {
unique_lock<mutex> reader_lock(reader->lock_);
- RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+ DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
DCHECK(reader->Validate()) << endl << reader->DebugString();
DCHECK_GT(state.num_threads_in_op(), 0);
DCHECK(buffer->buffer_ != NULL);
- if (reader->state_ == RequestContext::Cancelled) {
+ if (reader->state_ == DiskIoRequestContext::Cancelled) {
state.DecrementRequestThreadAndCheckDone(reader);
DCHECK(reader->Validate()) << endl << reader->DebugString();
ReturnFreeBuffer(buffer);
@@ -930,7 +931,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader
return;
}
- DCHECK_EQ(reader->state_, RequestContext::Active);
+ DCHECK_EQ(reader->state_, DiskIoRequestContext::Active);
DCHECK(buffer->buffer_ != NULL);
// Update the reader's scan ranges. There are a three cases here:
@@ -979,7 +980,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
// 3. Perform the read or write as specified.
// Cancellation checking needs to happen in both steps 1 and 3.
while (true) {
- RequestContext* worker_context = NULL;;
+ DiskIoRequestContext* worker_context = NULL;;
RequestRange* range = NULL;
if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
@@ -1000,7 +1001,7 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
// This function reads the specified scan range associated with the
// specified reader context and disk queue.
-void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
ScanRange* range) {
char* buffer = NULL;
int64_t bytes_remaining = range->len_ - range->bytes_read_;
@@ -1017,11 +1018,11 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
}
if (!enough_memory) {
- RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+ DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
unique_lock<mutex> reader_lock(reader->lock_);
// Just grabbed the reader lock, check for cancellation.
- if (reader->state_ == RequestContext::Cancelled) {
+ if (reader->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(reader->Validate()) << endl << reader->DebugString();
state.DecrementRequestThreadAndCheckDone(reader);
range->Cancel(reader->status_);
@@ -1087,7 +1088,7 @@ void DiskIoMgr::ReadRange(DiskQueue* disk_queue, RequestContext* reader,
HandleReadFinished(disk_queue, reader, buffer_desc);
}
-void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
+void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
FILE* file_handle = fopen(write_range->file(), "rb+");
Status ret_status;
if (file_handle == NULL) {
@@ -1137,11 +1138,11 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
return idx;
}
-Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) {
+Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) {
DCHECK_LE(write_range->len(), max_buffer_size_);
unique_lock<mutex> writer_lock(writer->lock_);
- if (writer->state_ == RequestContext::Cancelled) {
+ if (writer->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(!writer->status_.ok());
return writer->status_;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index b130d52..79902c5 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -23,7 +23,6 @@
#include <boost/unordered_set.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
-#include <boost/thread/thread.hpp>
#include "common/atomic.h"
#include "common/hdfs.h"
@@ -42,7 +41,7 @@ namespace impala {
class MemTracker;
/// Manager object that schedules IO for all queries on all disks and remote filesystems
-/// (such as S3). Each query maps to one or more RequestContext objects, each of which
+/// (such as S3). Each query maps to one or more DiskIoRequestContext objects, each of which
/// has its own queue of scan ranges and/or write ranges.
//
/// The API splits up requesting scan/write ranges (non-blocking) and reading the data
@@ -185,12 +184,14 @@ class MemTracker;
/// - Internal classes are defined in disk-io-mgr-internal.h
/// - ScanRange APIs are implemented in disk-io-mgr-scan-range.cc
/// This contains the ready buffer queue logic
-/// - RequestContext APIs are implemented in disk-io-mgr-reader-context.cc
+/// - DiskIoRequestContext APIs are implemented in disk-io-mgr-reader-context.cc
/// This contains the logic for picking scan ranges for a reader.
/// - Disk Thread and general APIs are implemented in disk-io-mgr.cc.
+
+class DiskIoRequestContext;
+
class DiskIoMgr {
public:
- class RequestContext;
class ScanRange;
/// This class is a small wrapper around the hdfsFile handle and the file system
@@ -247,16 +248,17 @@ class DiskIoMgr {
private:
friend class DiskIoMgr;
+ friend class DiskIoRequestContext;
BufferDescriptor(DiskIoMgr* io_mgr);
/// Resets the buffer descriptor state for a new reader, range and data buffer.
- void Reset(RequestContext* reader, ScanRange* range, char* buffer,
+ void Reset(DiskIoRequestContext* reader, ScanRange* range, char* buffer,
int64_t buffer_len);
DiskIoMgr* io_mgr_;
/// Reader that this buffer is for
- RequestContext* reader_;
+ DiskIoRequestContext* reader_;
/// The current tracker this buffer is associated with.
MemTracker* mem_tracker_;
@@ -367,9 +369,10 @@ class DiskIoMgr {
private:
friend class DiskIoMgr;
+ friend class DiskIoRequestContext;
/// Initialize internal fields
- void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
+ void InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader);
/// Enqueues a buffer for this range. This does not block.
/// Returns true if this scan range has hit the queue capacity, false otherwise.
@@ -423,7 +426,7 @@ class DiskIoMgr {
DiskIoMgr* io_mgr_;
/// Reader/owner of the scan range
- RequestContext* reader_;
+ DiskIoRequestContext* reader_;
/// File handle either to hdfs or local fs (FILE*)
///
@@ -446,7 +449,7 @@ class DiskIoMgr {
int bytes_read_;
/// Status for this range. This is non-ok if is_cancelled_ is true.
- /// Note: an individual range can fail without the RequestContext being
+ /// Note: an individual range can fail without the DiskIoRequestContext being
/// cancelled. This allows us to skip individual ranges.
Status status_;
@@ -509,6 +512,7 @@ class DiskIoMgr {
private:
friend class DiskIoMgr;
+ friend class DiskIoRequestContext;
/// Data to be written. RequestRange::len_ contains the length of data
/// to be written.
@@ -540,13 +544,13 @@ class DiskIoMgr {
/// Allocates tracking structure for a request context.
/// Register a new request context which is returned in *request_context.
- /// The IoMgr owns the allocated RequestContext object. The caller must call
+ /// The IoMgr owns the allocated DiskIoRequestContext object. The caller must call
/// UnregisterContext() for each context.
/// reader_mem_tracker: Is non-null only for readers. IO buffers
/// used for this reader will be tracked by this. If the limit is exceeded
/// the reader will be cancelled and MEM_LIMIT_EXCEEDED will be returned via
/// GetNext().
- Status RegisterContext(RequestContext** request_context,
+ Status RegisterContext(DiskIoRequestContext** request_context,
MemTracker* reader_mem_tracker = NULL);
/// Unregisters context from the disk IoMgr. This must be called for every
@@ -555,7 +559,7 @@ class DiskIoMgr {
/// The 'context' cannot be used after this call.
/// This call blocks until all the disk threads have finished cleaning up.
/// UnregisterContext also cancels the reader/writer from the disk IoMgr.
- void UnregisterContext(RequestContext* context);
+ void UnregisterContext(DiskIoRequestContext* context);
/// This function cancels the context asychronously. All outstanding requests
/// are aborted and tracking structures cleaned up. This does not need to be
@@ -565,7 +569,7 @@ class DiskIoMgr {
/// context to reach 0. After calling with wait_for_disks_completion = true, the only
/// valid API is returning IO buffers that have already been returned.
/// Takes context->lock_ if wait_for_disks_completion is true.
- void CancelContext(RequestContext* context, bool wait_for_disks_completion = false);
+ void CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion = false);
/// Adds the scan ranges to the queues. This call is non-blocking. The caller must
/// not deallocate the scan range pointers before UnregisterContext().
@@ -573,26 +577,26 @@ class DiskIoMgr {
/// (i.e. the caller should not/cannot call GetNextRange for these ranges).
/// This can be used to do synchronous reads as well as schedule dependent ranges,
/// as in the case for columnar formats.
- Status AddScanRanges(RequestContext* reader, const std::vector<ScanRange*>& ranges,
+ Status AddScanRanges(DiskIoRequestContext* reader, const std::vector<ScanRange*>& ranges,
bool schedule_immediately = false);
/// Add a WriteRange for the writer. This is non-blocking and schedules the context
/// on the IoMgr disk queue. Does not create any files.
- Status AddWriteRange(RequestContext* writer, WriteRange* write_range);
+ Status AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range);
/// Returns the next unstarted scan range for this reader. When the range is returned,
/// the disk threads in the IoMgr will already have started reading from it. The
/// caller is expected to call ScanRange::GetNext on the returned range.
/// If there are no more unstarted ranges, NULL is returned.
/// This call is blocking.
- Status GetNextRange(RequestContext* reader, ScanRange** range);
+ Status GetNextRange(DiskIoRequestContext* reader, ScanRange** range);
/// Reads the range and returns the result in buffer.
/// This behaves like the typical synchronous read() api, blocking until the data
/// is read. This can be called while there are outstanding ScanRanges and is
/// thread safe. Multiple threads can be calling Read() per reader at a time.
/// range *cannot* have already been added via AddScanRanges.
- Status Read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
+ Status Read(DiskIoRequestContext* reader, ScanRange* range, BufferDescriptor** buffer);
/// Determine which disk queue this file should be assigned to. Returns an index into
/// disk_queues_. The disk_id is the volume ID for the local disk that holds the
@@ -600,21 +604,21 @@ class DiskIoMgr {
/// co-located with the datanode for this file.
int AssignQueue(const char* file, int disk_id, bool expected_local);
- /// TODO: The functions below can be moved to RequestContext.
+ /// TODO: The functions below can be moved to DiskIoRequestContext.
/// Returns the current status of the context.
- Status context_status(RequestContext* context) const;
+ Status context_status(DiskIoRequestContext* context) const;
- void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
- void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
- void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
- void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
+ void set_bytes_read_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+ void set_read_timer(DiskIoRequestContext*, RuntimeProfile::Counter*);
+ void set_active_read_thread_counter(DiskIoRequestContext*, RuntimeProfile::Counter*);
+ void set_disks_access_bitmap(DiskIoRequestContext*, RuntimeProfile::Counter*);
- int64_t queue_size(RequestContext* reader) const;
- int64_t bytes_read_local(RequestContext* reader) const;
- int64_t bytes_read_short_circuit(RequestContext* reader) const;
- int64_t bytes_read_dn_cache(RequestContext* reader) const;
- int num_remote_ranges(RequestContext* reader) const;
- int64_t unexpected_remote_bytes(RequestContext* reader) const;
+ int64_t queue_size(DiskIoRequestContext* reader) const;
+ int64_t bytes_read_local(DiskIoRequestContext* reader) const;
+ int64_t bytes_read_short_circuit(DiskIoRequestContext* reader) const;
+ int64_t bytes_read_dn_cache(DiskIoRequestContext* reader) const;
+ int num_remote_ranges(DiskIoRequestContext* reader) const;
+ int64_t unexpected_remote_bytes(DiskIoRequestContext* reader) const;
/// Returns the read throughput across all readers.
/// TODO: should this be a sliding window? This should report metrics for the
@@ -671,6 +675,7 @@ class DiskIoMgr {
private:
friend class BufferDescriptor;
+ friend class DiskIoRequestContext;
struct DiskQueue;
class RequestContextCache;
@@ -757,7 +762,7 @@ class DiskIoMgr {
/// should be <= max_buffer_size_. These constraints will be met if buffer was acquired
/// via GetFreeBuffer() (which it should have been).
BufferDescriptor* GetBufferDesc(
- RequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
+ DiskIoRequestContext* reader, ScanRange* range, char* buffer, int64_t buffer_size);
/// Returns a buffer desc object which can now be used for another reader.
void ReturnBufferDesc(BufferDescriptor* desc);
@@ -798,11 +803,11 @@ class DiskIoMgr {
/// Only returns false if the disk thread should be shut down.
/// No locks should be taken before this function call and none are left taken after.
bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
- RequestContext** request_context);
+ DiskIoRequestContext** request_context);
/// Updates disk queue and reader state after a read is complete. The read result
/// is captured in the buffer descriptor.
- void HandleReadFinished(DiskQueue*, RequestContext*, BufferDescriptor*);
+ void HandleReadFinished(DiskQueue*, DiskIoRequestContext*, BufferDescriptor*);
/// Invokes write_range->callback_ after the range has been written and
/// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
@@ -810,7 +815,7 @@ class DiskIoMgr {
/// The write_status does not affect the writer->status_. That is, an write error does
/// not cancel the writer context - that decision is left to the callback handler.
/// TODO: On the read path, consider not canceling the reader context on error.
- void HandleWriteFinished(RequestContext* writer, WriteRange* write_range,
+ void HandleWriteFinished(DiskIoRequestContext* writer, WriteRange* write_range,
const Status& write_status);
/// Validates that range is correctly initialized
@@ -818,7 +823,7 @@ class DiskIoMgr {
/// Write the specified range to disk and calls HandleWriteFinished when done.
/// Responsible for opening and closing the file that is written.
- void Write(RequestContext* writer_context, WriteRange* write_range);
+ void Write(DiskIoRequestContext* writer_context, WriteRange* write_range);
/// Helper method to write a range using the specified FILE handle. Returns Status:OK
/// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise.
@@ -826,7 +831,7 @@ class DiskIoMgr {
Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range);
/// Reads the specified scan range and calls HandleReadFinished when done.
- void ReadRange(DiskQueue* disk_queue, RequestContext* reader,
+ void ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
ScanRange* range);
};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 07378e4..7eaa3db 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -22,6 +22,7 @@
#include "common/logging.h"
#include "resourcebroker/resource-broker.h"
+#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
#include "runtime/data-stream-mgr.h"
@@ -38,6 +39,7 @@
#include "statestore/statestore-subscriber.h"
#include "util/debug-util.h"
#include "util/default-path-handlers.h"
+#include "util/hdfs-bulk-ops.h"
#include "util/mem-info.h"
#include "util/metrics.h"
#include "util/network-util.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index e405bf8..c5404dc 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -17,35 +17,35 @@
#define IMPALA_RUNTIME_EXEC_ENV_H
#include <boost/scoped_ptr.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/thread.hpp>
+// NOTE: try not to add more headers here: exec-env.h is included in many many files.
#include "common/status.h"
-#include "runtime/backend-client.h"
-#include "util/cgroups-mgr.h"
-#include "util/hdfs-bulk-ops.h" // For declaration of HdfsOpThreadPool
-#include "resourcebroker/resource-broker.h"
+#include "runtime/client-cache-types.h"
+#include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool
namespace impala {
+class CallableThreadPool;
+class CgroupsMgr;
class DataStreamMgr;
class DiskIoMgr;
+class FragmentMgr;
+class Frontend;
class HBaseTableFactory;
class HdfsFsCache;
+class ImpalaServer;
class LibCache;
+class MemTracker;
+class MetricGroup;
+class QueryResourceMgr;
+class RequestPoolService;
+class ResourceBroker;
class Scheduler;
class StatestoreSubscriber;
class TestExecEnv;
-class Webserver;
-class MetricGroup;
-class MemTracker;
class ThreadResourceMgr;
-class CgroupsManager;
-class ImpalaServer;
-class RequestPoolService;
-class FragmentMgr;
-class Frontend;
class TmpFileMgr;
+class Webserver;
/// Execution environment for queries/plan fragments.
/// Contains all required global structures, and handles to
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.cc b/be/src/runtime/lib-cache.cc
index 5c1e423..b3a6f0a 100644
--- a/be/src/runtime/lib-cache.cc
+++ b/be/src/runtime/lib-cache.cc
@@ -29,14 +29,15 @@
#include "common/names.h"
namespace filesystem = boost::filesystem;
-using namespace impala;
DEFINE_string(local_library_dir, "/tmp",
"Local directory to copy UDF libraries from HDFS into");
+namespace impala {
+
scoped_ptr<LibCache> LibCache::instance_;
-struct LibCache::LibCacheEntry {
+struct LibCacheEntry {
// Lock protecting all fields in this entry
boost::mutex lock;
@@ -53,7 +54,7 @@ struct LibCache::LibCacheEntry {
bool check_needs_refresh;
// The type of this file.
- LibType type;
+ LibCache::LibType type;
// The path on the local file system for this library.
std::string local_path;
@@ -117,7 +118,7 @@ Status LibCache::InitInternal() {
return Status::OK();
}
-LibCache::LibCacheEntry::~LibCacheEntry() {
+LibCacheEntry::~LibCacheEntry() {
if (shared_object_handle != NULL) {
DCHECK_EQ(use_count, 0);
DCHECK(should_remove);
@@ -418,3 +419,5 @@ string LibCache::MakeLocalPath(const string& hdfs_path, const string& local_dir)
<< (num_libs_copied_.Add(1) - 1) << src.extension().native();
return dst.str();
}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/lib-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/lib-cache.h b/be/src/runtime/lib-cache.h
index b36a859..9201341 100644
--- a/be/src/runtime/lib-cache.h
+++ b/be/src/runtime/lib-cache.h
@@ -50,10 +50,10 @@ class RuntimeState;
/// TODO:
/// - refresh libraries
/// - better cached module management.
+struct LibCacheEntry;
+
class LibCache {
public:
- struct LibCacheEntry;
-
enum LibType {
TYPE_SO, // Shared object
TYPE_IR, // IR intermediate
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 9e22e9f..fdf38e9 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -23,7 +23,6 @@
#include "common/logging.h"
#include "util/bit-util.h"
-#include "util/runtime-profile.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 9534671..73c9300 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,7 +29,7 @@
#include "util/debug-util.h"
#include "util/internal-queue.h"
#include "util/metrics.h"
-#include "util/runtime-profile.h"
+#include "util/runtime-profile-counters.h"
#include "util/spinlock.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 99ed75d..75c5f58 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -29,10 +29,13 @@
#include "exec/hdfs-scan-node.h"
#include "exec/hbase-table-scanner.h"
#include "exprs/expr.h"
+#include "resourcebroker/resource-broker.h"
#include "runtime/descriptors.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/row-batch.h"
+#include "runtime/runtime-filter-bank.h"
#include "runtime/mem-tracker.h"
+#include "scheduling/query-resource-mgr.h"
#include "util/cgroups-mgr.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
@@ -589,7 +592,7 @@ void PlanFragmentExecutor::Close() {
runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
}
if (plan_ != NULL) plan_->Close(runtime_state_.get());
- for (DiskIoMgr::RequestContext* context: *runtime_state_->reader_contexts()) {
+ for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
runtime_state_->io_mgr()->UnregisterContext(context);
}
exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index 91cece6..29250a3 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "common/object-pool.h"
#include "runtime/runtime-state.h"
+#include "util/runtime-profile-counters.h"
#include "util/thread.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc b/be/src/runtime/row-batch-serialize-test.cc
index cb49bda..1f28440 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -15,6 +15,8 @@
#include "testutil/gtest-util.h"
#include "runtime/collection-value.h"
#include "runtime/collection-value-builder.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/raw-value.h"
#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
#include "runtime/tuple-row.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3f2ba29..4197977 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -27,7 +27,6 @@
#include "runtime/descriptors.h"
#include "runtime/disk-io-mgr.h"
#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
new file mode 100644
index 0000000..91f12c9
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -0,0 +1,222 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "runtime/runtime-filter-bank.h"
+
+#include "common/names.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/bits.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/client-cache.h"
+#include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-filter.inline.h"
+#include "service/impala-server.h"
+#include "util/bloom-filter.h"
+
+using namespace impala;
+using namespace boost;
+using namespace strings;
+
+DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
+ "positives in a runtime filter before it is disabled.");
+
+const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
+const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
+
+RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
+ : state_(state), closed_(false) {
+ memory_allocated_ =
+ state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
+
+ // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
+ max_filter_size_ = query_ctx.request.query_options.runtime_filter_max_size;
+ max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
+ max_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+ min_filter_size_ = query_ctx.request.query_options.runtime_filter_min_size;
+ min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
+ min_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
+
+ // Make sure that min <= max
+ min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
+
+ DCHECK_GT(min_filter_size_, 0);
+ DCHECK_GT(max_filter_size_, 0);
+
+ default_filter_size_ = query_ctx.request.query_options.runtime_bloom_filter_size;
+ default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
+ default_filter_size_ =
+ BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
+}
+
+RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
+ bool is_producer) {
+ RuntimeFilter* ret = obj_pool_.Add(
+ new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
+ lock_guard<mutex> l(runtime_filter_lock_);
+ if (is_producer) {
+ DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
+ produced_filters_[filter_desc.filter_id] = ret;
+ } else {
+ if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
+ consumed_filters_[filter_desc.filter_id] = ret;
+ } else {
+ // The filter has already been registered in this filter bank by another
+ // target node.
+ DCHECK_GT(filter_desc.targets.size(), 1);
+ ret = consumed_filters_[filter_desc.filter_id];
+ }
+ }
+ return ret;
+}
+
+namespace {
+
+/// Sends a filter to the coordinator. Executed asynchronously in the context of
+/// ExecEnv::rpc_pool().
+void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
+ ImpalaBackendClientCache* client_cache) {
+ Status status;
+ ImpalaBackendConnection coord(client_cache, address, &status);
+ if (!status.ok()) {
+ // Failing to send a filter is not a query-wide error - the remote fragment will
+ // continue regardless.
+ // TODO: Retry.
+ LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
+ return;
+ }
+ TUpdateFilterResult res;
+ status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
+}
+
+}
+
+void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
+ BloomFilter* bloom_filter) {
+ DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
+ << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
+ TUpdateFilterParams params;
+ // A runtime filter may have both local and remote targets.
+ bool has_local_target = false;
+ bool has_remote_target = false;
+ {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+ DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
+ << filter_id;
+ it->second->SetBloomFilter(bloom_filter);
+ has_local_target = it->second->filter_desc().has_local_targets;
+ has_remote_target = it->second->filter_desc().has_remote_targets;
+ }
+
+ if (has_local_target) {
+ // Do a short circuit publication by pushing the same BloomFilter to the consumer
+ // side.
+ RuntimeFilter* filter;
+ {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+ if (it == consumed_filters_.end()) return;
+ filter = it->second;
+ }
+ filter->SetBloomFilter(bloom_filter);
+ state_->runtime_profile()->AddInfoString(
+ Substitute("Filter $0 arrival", filter_id),
+ PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
+ }
+
+ if (has_remote_target
+ && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+ BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
+ params.filter_id = filter_id;
+ params.query_id = state_->query_id();
+
+ ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
+ SendFilterToCoordinator, state_->query_ctx().coord_address, params,
+ ExecEnv::GetInstance()->impalad_client_cache()));
+ }
+}
+
+void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
+ const TBloomFilter& thrift_filter) {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ if (closed_) return;
+ RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
+ DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
+ << filter_id;
+ if (thrift_filter.always_true) {
+ it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+ } else {
+ int64_t required_space =
+ BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
+ // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
+ // there's not enough memory for it.
+ if (!state_->query_mem_tracker()->TryConsume(required_space)) {
+ VLOG_QUERY << "No memory for global filter: " << filter_id
+ << " (fragment instance: " << state_->fragment_instance_id() << ")";
+ it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
+ } else {
+ BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
+ DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+ memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+ it->second->SetBloomFilter(bloom_filter);
+ }
+ }
+ state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
+ PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
+}
+
+BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ if (closed_) return NULL;
+
+ RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
+ DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
+
+ // Track required space
+ int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
+ int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
+ if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
+ BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
+ DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
+ memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
+ return bloom_filter;
+}
+
+int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
+ if (ndv == -1) return default_filter_size_;
+ int64_t required_space =
+ 1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
+ required_space = max<int64_t>(required_space, min_filter_size_);
+ required_space = min<int64_t>(required_space, max_filter_size_);
+ return required_space;
+}
+
+bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
+ double fpp =
+ BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
+ return fpp > FLAGS_max_filter_error_rate;
+}
+
+void RuntimeFilterBank::Close() {
+ lock_guard<mutex> l(runtime_filter_lock_);
+ closed_ = true;
+ obj_pool_.Clear();
+ state_->query_mem_tracker()->Release(memory_allocated_->value());
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter-bank.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
new file mode 100644
index 0000000..4703a0f
--- /dev/null
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -0,0 +1,149 @@
+// Copyright 2016 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+#define IMPALA_RUNTIME_RUNTIME_FILTER_BANK_H
+
+#include "common/object-pool.h"
+#include "runtime/types.h"
+#include "util/runtime-profile.h"
+
+#include <boost/thread/lock_guard.hpp>
+#include <boost/unordered_map.hpp>
+
+namespace impala {
+
+class BloomFilter;
+class RuntimeFilter;
+class RuntimeState;
+class TBloomFilter;
+class TRuntimeFilterDesc;
+class TQueryCtx;
+
+/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
+/// predicates across the plan tree dynamically. Each fragment instance manages its
+/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
+/// objects and data structures.
+///
+/// A RuntimeFilterBank manages both production and consumption of filters. In the case
+/// where a given filter is both consumed and produced by the same fragment, the
+/// RuntimeFilterBank treats each filter independently.
+///
+/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
+/// fragments update the bloom filters by calling UpdateFilterFromLocal()
+/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
+/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
+/// AllocateScratchBloomFilter(); this allows RuntimeFilterBank to manage all memory
+/// associated with filters.
+///
+/// Filters are aggregated at the coordinator, and then made available to consumers after
+/// PublishGlobalFilter() has been called.
+///
+/// After PublishGlobalFilter() has been called (and again, it may only be called once per
+/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
+/// bloom_filter, and may be used for filter evaluation. This operation occurs without
+/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
+/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
+class RuntimeFilterBank {
+ public:
+ RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
+
+ /// Registers a filter that will either be produced (is_producer == false) or consumed
+ /// (is_producer == true) by fragments that share this RuntimeState. The filter
+ /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
+ RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
+
+ /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
+ /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
+ /// full filter that contains all elements.
+ void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
+
+ /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
+ /// consumption by operators that wish to use it for filtering.
+ void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
+
+ /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
+ /// 'filter_size' would have an expected false-positive rate which would exceed
+ /// FLAGS_max_filter_error_rate.
+ bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
+
+ /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
+ /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
+ /// clients and subsequently accessed without synchronization. Concurrent calls to
+ /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
+ /// need for client synchronization.
+ inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
+
+ /// Returns a bloom_filter that can be used by an operator to produce a local filter,
+ /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
+ /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
+ /// should not be deleted by the caller. The filter identified by 'filter_id' must have
+ /// been previously registered as a 'producer' by RegisterFilter().
+ ///
+ /// If there is not enough memory, or if Close() has been called first, returns NULL.
+ BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
+
+ /// Default hash seed to use when computing hashed values to insert into filters.
+ static const int32_t DefaultHashSeed() { return 1234; }
+
+ /// Releases all memory allocated for BloomFilters.
+ void Close();
+
+ static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB
+ static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB
+
+ private:
+ /// Returns the the space (in bytes) required for a filter to achieve the configured
+ /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
+ /// estimate is known), the default filter size is returned.
+ int64_t GetFilterSizeForNdv(int64_t ndv);
+
+ /// Lock protecting produced_filters_ and consumed_filters_.
+ boost::mutex runtime_filter_lock_;
+
+ /// Map from filter id to a RuntimeFilter.
+ typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
+
+ /// All filters expected to be produced by the local plan fragment instance.
+ RuntimeFilterMap produced_filters_;
+
+ /// All filters expected to be consumed by the local plan fragment instance.
+ RuntimeFilterMap consumed_filters_;
+
+ /// Fragment instance's runtime state.
+ RuntimeState* state_;
+
+ /// Object pool to track allocated Bloom filters.
+ ObjectPool obj_pool_;
+
+ /// True iff Close() has been called. Used to prevent races between
+ /// AllocateScratchBloomFilter() and Close().
+ bool closed_;
+
+ /// Total amount of memory allocated to Bloom Filters
+ RuntimeProfile::Counter* memory_allocated_;
+
+ /// Precomputed default BloomFilter size.
+ int64_t default_filter_size_;
+
+ /// Maximum filter size, in bytes, rounded up to a power of two.
+ int64_t max_filter_size_;
+
+ /// Minimum filter size, in bytes, rounded up to a power of two.
+ int64_t min_filter_size_;
+};
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index b0126ec..9ae6cbb 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -14,211 +14,14 @@
#include "runtime/runtime-filter.inline.h"
+#include "util/time.h"
+
#include "common/names.h"
-#include "gutil/bits.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/client-cache.h"
-#include "runtime/exec-env.h"
-#include "runtime/backend-client.h"
-#include "service/impala-server.h"
-#include "util/bloom-filter.h"
using namespace impala;
-using namespace boost;
-using namespace strings;
-
-DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The maximum probability of false "
- "positives in a runtime filter before it is disabled.");
const int RuntimeFilter::SLEEP_PERIOD_MS = 20;
-const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
-const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
-
-RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state)
- : query_ctx_(query_ctx), state_(state), closed_(false) {
- memory_allocated_ =
- state->runtime_profile()->AddCounter("BloomFilterBytes", TUnit::BYTES);
-
- // Clamp bloom filter size down to the limits {MIN,MAX}_BLOOM_FILTER_SIZE
- max_filter_size_ = query_ctx_.request.query_options.runtime_filter_max_size;
- max_filter_size_ = max<int64_t>(max_filter_size_, MIN_BLOOM_FILTER_SIZE);
- max_filter_size_ =
- BitUtil::RoundUpToPowerOfTwo(min<int64_t>(max_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
- min_filter_size_ = query_ctx_.request.query_options.runtime_filter_min_size;
- min_filter_size_ = max<int64_t>(min_filter_size_, MIN_BLOOM_FILTER_SIZE);
- min_filter_size_ =
- BitUtil::RoundUpToPowerOfTwo(min<int64_t>(min_filter_size_, MAX_BLOOM_FILTER_SIZE));
-
- // Make sure that min <= max
- min_filter_size_ = min<int64_t>(min_filter_size_, max_filter_size_);
-
- DCHECK_GT(min_filter_size_, 0);
- DCHECK_GT(max_filter_size_, 0);
-
- default_filter_size_ = query_ctx_.request.query_options.runtime_bloom_filter_size;
- default_filter_size_ = max<int64_t>(default_filter_size_, min_filter_size_);
- default_filter_size_ =
- BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
-}
-
-RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
- bool is_producer) {
- RuntimeFilter* ret = obj_pool_.Add(
- new RuntimeFilter(filter_desc, GetFilterSizeForNdv(filter_desc.ndv_estimate)));
- lock_guard<mutex> l(runtime_filter_lock_);
- if (is_producer) {
- DCHECK(produced_filters_.find(filter_desc.filter_id) == produced_filters_.end());
- produced_filters_[filter_desc.filter_id] = ret;
- } else {
- if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
- consumed_filters_[filter_desc.filter_id] = ret;
- } else {
- // The filter has already been registered in this filter bank by another
- // target node.
- DCHECK_GT(filter_desc.targets.size(), 1);
- ret = consumed_filters_[filter_desc.filter_id];
- }
- }
- return ret;
-}
-
-namespace {
-
-/// Sends a filter to the coordinator. Executed asynchronously in the context of
-/// ExecEnv::rpc_pool().
-void SendFilterToCoordinator(TNetworkAddress address, TUpdateFilterParams params,
- ImpalaBackendClientCache* client_cache) {
- Status status;
- ImpalaBackendConnection coord(client_cache, address, &status);
- if (!status.ok()) {
- // Failing to send a filter is not a query-wide error - the remote fragment will
- // continue regardless.
- // TODO: Retry.
- LOG(INFO) << "Couldn't send filter to coordinator: " << status.msg().msg();
- return;
- }
- TUpdateFilterResult res;
- status = coord.DoRpc(&ImpalaBackendClient::UpdateFilter, params, &res);
-}
-
-}
-
-void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
- BloomFilter* bloom_filter) {
- DCHECK_NE(state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
- << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
- TUpdateFilterParams params;
- // A runtime filter may have both local and remote targets.
- bool has_local_target = false;
- bool has_remote_target = false;
- {
- lock_guard<mutex> l(runtime_filter_lock_);
- RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
- DCHECK(it != produced_filters_.end()) << "Tried to update unregistered filter: "
- << filter_id;
- it->second->SetBloomFilter(bloom_filter);
- has_local_target = it->second->filter_desc().has_local_targets;
- has_remote_target = it->second->filter_desc().has_remote_targets;
- }
-
- if (has_local_target) {
- // Do a short circuit publication by pushing the same BloomFilter to the consumer
- // side.
- RuntimeFilter* filter;
- {
- lock_guard<mutex> l(runtime_filter_lock_);
- RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
- if (it == consumed_filters_.end()) return;
- filter = it->second;
- }
- filter->SetBloomFilter(bloom_filter);
- state_->runtime_profile()->AddInfoString(
- Substitute("Filter $0 arrival", filter_id),
- PrettyPrinter::Print(filter->arrival_delay(), TUnit::TIME_MS));
- }
-
- if (has_remote_target
- && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
- BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter);
- params.filter_id = filter_id;
- params.query_id = query_ctx_.query_id;
-
- ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
- SendFilterToCoordinator, query_ctx_.coord_address, params,
- ExecEnv::GetInstance()->impalad_client_cache()));
- }
-}
-
-void RuntimeFilterBank::PublishGlobalFilter(int32_t filter_id,
- const TBloomFilter& thrift_filter) {
- lock_guard<mutex> l(runtime_filter_lock_);
- if (closed_) return;
- RuntimeFilterMap::iterator it = consumed_filters_.find(filter_id);
- DCHECK(it != consumed_filters_.end()) << "Tried to publish unregistered filter: "
- << filter_id;
- if (thrift_filter.always_true) {
- it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
- } else {
- int64_t required_space =
- BloomFilter::GetExpectedHeapSpaceUsed(thrift_filter.log_heap_space);
- // Silently fail to publish the filter (replacing it with a 0-byte complete one) if
- // there's not enough memory for it.
- if (!state_->query_mem_tracker()->TryConsume(required_space)) {
- VLOG_QUERY << "No memory for global filter: " << filter_id
- << " (fragment instance: " << state_->fragment_instance_id() << ")";
- it->second->SetBloomFilter(BloomFilter::ALWAYS_TRUE_FILTER);
- } else {
- BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(thrift_filter));
- DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
- memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
- it->second->SetBloomFilter(bloom_filter);
- }
- }
- state_->runtime_profile()->AddInfoString(Substitute("Filter $0 arrival", filter_id),
- PrettyPrinter::Print(it->second->arrival_delay(), TUnit::TIME_MS));
-}
-
-BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
- lock_guard<mutex> l(runtime_filter_lock_);
- if (closed_) return NULL;
-
- RuntimeFilterMap::iterator it = produced_filters_.find(filter_id);
- DCHECK(it != produced_filters_.end()) << "Filter ID " << filter_id << " not registered";
-
- // Track required space
- int64_t log_filter_size = Bits::Log2Ceiling64(it->second->filter_size());
- int64_t required_space = BloomFilter::GetExpectedHeapSpaceUsed(log_filter_size);
- if (!state_->query_mem_tracker()->TryConsume(required_space)) return NULL;
- BloomFilter* bloom_filter = obj_pool_.Add(new BloomFilter(log_filter_size));
- DCHECK_EQ(required_space, bloom_filter->GetHeapSpaceUsed());
- memory_allocated_->Add(bloom_filter->GetHeapSpaceUsed());
- return bloom_filter;
-}
-
-int64_t RuntimeFilterBank::GetFilterSizeForNdv(int64_t ndv) {
- if (ndv == -1) return default_filter_size_;
- int64_t required_space =
- 1LL << BloomFilter::MinLogSpace(ndv, FLAGS_max_filter_error_rate);
- required_space = max<int64_t>(required_space, min_filter_size_);
- required_space = min<int64_t>(required_space, max_filter_size_);
- return required_space;
-}
-
-bool RuntimeFilterBank::FpRateTooHigh(int64_t filter_size, int64_t observed_ndv) {
- double fpp =
- BloomFilter::FalsePositiveProb(observed_ndv, Bits::Log2Ceiling64(filter_size));
- return fpp > FLAGS_max_filter_error_rate;
-}
-
-void RuntimeFilterBank::Close() {
- lock_guard<mutex> l(runtime_filter_lock_);
- closed_ = true;
- obj_pool_.Clear();
- state_->query_mem_tracker()->Release(memory_allocated_->value());
-}
-
bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
do {
if (HasBloomFilter()) return true;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/6198d926/be/src/runtime/runtime-filter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index 178c03f..4168704 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -16,135 +16,14 @@
#ifndef IMPALA_RUNTIME_RUNTIME_FILTER_H
#define IMPALA_RUNTIME_RUNTIME_FILTER_H
-#include <boost/unordered_map.hpp>
-
-#include "common/object-pool.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-#include "gen-cpp/PlanNodes_types.h"
-#include "runtime/types.h"
-#include "util/runtime-profile.h"
+#include "runtime/raw-value.h"
+#include "runtime/runtime-filter-bank.h"
+#include "util/bloom-filter.h"
#include "util/spinlock.h"
namespace impala {
class BloomFilter;
-class RuntimeFilter;
-class RuntimeState;
-
-/// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
-/// predicates across the plan tree dynamically. Each fragment instance manages its
-/// filters with a RuntimeFilterBank which provides low-synchronization access to filter
-/// objects and data structures.
-///
-/// A RuntimeFilterBank manages both production and consumption of filters. In the case
-/// where a given filter is both consumed and produced by the same fragment, the
-/// RuntimeFilterBank treats each filter independently.
-///
-/// All filters must be registered with the filter bank via RegisterFilter(). Local plan
-/// fragments update the bloom filters by calling UpdateFilterFromLocal()
-/// (UpdateFilterFromLocal() may only be called once per filter ID per filter bank). The
-/// bloom_filter that is passed into UpdateFilterFromLocal() must have been allocated by
-/// AllocateScratchBloomFilter() (or be NULL); this allows RuntimeFilterBank to manage all
-/// memory associated with filters.
-///
-/// Filters are aggregated at the coordinator, and then made available to consumers after
-/// PublishGlobalFilter() has been called.
-///
-/// After PublishGlobalFilter() has been called (and again, it may only be called once per
-/// filter_id), the RuntimeFilter object associated with filter_id will have a valid
-/// bloom_filter, and may be used for filter evaluation. This operation occurs without
-/// synchronisation, and neither the thread that calls PublishGlobalFilter() nor the
-/// thread that may call RuntimeFilter::Eval() need to coordinate in any way.
-class RuntimeFilterBank {
- public:
- RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* state);
-
- /// Registers a filter that will either be produced (is_producer == false) or consumed
- /// (is_producer == true) by fragments that share this RuntimeState. The filter
- /// bloom_filter itself is unallocated until the first call to PublishGlobalFilter().
- RuntimeFilter* RegisterFilter(const TRuntimeFilterDesc& filter_desc, bool is_producer);
-
- /// Updates a filter's bloom_filter with 'bloom_filter' which has been produced by some
- /// operator in the local fragment instance. 'bloom_filter' may be NULL, representing a
- /// full filter that contains all elements.
- void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter);
-
- /// Makes a bloom_filter (aggregated globally from all producer fragments) available for
- /// consumption by operators that wish to use it for filtering.
- void PublishGlobalFilter(int32_t filter_id, const TBloomFilter& thrift_filter);
-
- /// Returns true if, according to the observed NDV in 'observed_ndv', a filter of size
- /// 'filter_size' would have an expected false-positive rate which would exceed
- /// FLAGS_max_filter_error_rate.
- bool FpRateTooHigh(int64_t filter_size, int64_t observed_ndv);
-
- /// Returns a RuntimeFilter with the given filter id. This is safe to call after all
- /// calls to RegisterFilter() have finished, and not before. Filters may be cached by
- /// clients and subsequently accessed without synchronization. Concurrent calls to
- /// PublishGlobalFilter() will update a filter's bloom filter atomically, without the
- /// need for client synchronization.
- inline const RuntimeFilter* GetRuntimeFilter(int32_t filter_id);
-
- /// Returns a bloom_filter that can be used by an operator to produce a local filter,
- /// which may then be used in UpdateFilterFromLocal(). The memory returned is owned by
- /// the RuntimeFilterBank (which may transfer it to a RuntimeFilter subsequently), and
- /// should not be deleted by the caller. The filter identified by 'filter_id' must have
- /// been previously registered as a 'producer' by RegisterFilter().
- ///
- /// If there is not enough memory, or if Close() has been called first, returns NULL.
- BloomFilter* AllocateScratchBloomFilter(int32_t filter_id);
-
- /// Default hash seed to use when computing hashed values to insert into filters.
- static const int32_t DefaultHashSeed() { return 1234; }
-
- /// Releases all memory allocated for BloomFilters.
- void Close();
-
- static const int64_t MIN_BLOOM_FILTER_SIZE = 4 * 1024; // 4KB
- static const int64_t MAX_BLOOM_FILTER_SIZE = 16 * 1024 * 1024; // 16MB
-
- private:
- /// Returns the the space (in bytes) required for a filter to achieve the configured
- /// maximum false-positive rate based on the expected NDV. If 'ndv' is -1 (i.e. no
- /// estimate is known), the default filter size is returned.
- int64_t GetFilterSizeForNdv(int64_t ndv);
-
- const TQueryCtx query_ctx_;
-
- /// Lock protecting produced_filters_ and consumed_filters_.
- boost::mutex runtime_filter_lock_;
-
- /// Map from filter id to a RuntimeFilter.
- typedef boost::unordered_map<int32_t, RuntimeFilter*> RuntimeFilterMap;
-
- /// All filters expected to be produced by the local plan fragment instance.
- RuntimeFilterMap produced_filters_;
-
- /// All filters expected to be consumed by the local plan fragment instance.
- RuntimeFilterMap consumed_filters_;
-
- /// Fragment instance's runtime state.
- RuntimeState* state_;
-
- /// Object pool to track allocated Bloom filters.
- ObjectPool obj_pool_;
-
- /// True iff Close() has been called. Used to prevent races between
- /// AllocateScratchBloomFilter() and Close().
- bool closed_;
-
- /// Total amount of memory allocated to Bloom Filters
- RuntimeProfile::Counter* memory_allocated_;
-
- /// Precomputed default BloomFilter size.
- int64_t default_filter_size_;
-
- /// Maximum filter size, in bytes, rounded up to a power of two.
- int64_t max_filter_size_;
-
- /// Minimum filter size, in bytes, rounded up to a power of two.
- int64_t min_filter_size_;
-};
/// RuntimeFilters represent set-membership predicates (implemented with bloom filters)
/// that are computed during query execution (rather than during planning). They can then