You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:58 UTC
[15/16] incubator-impala git commit: IMPALA-4835 (prep only): create
io subfolder and namespace
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-reader-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.h b/be/src/runtime/disk-io-mgr-reader-context.h
deleted file mode 100644
index 90426d9..0000000
--- a/be/src/runtime/disk-io-mgr-reader-context.h
+++ /dev/null
@@ -1,406 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_READER_CONTEXT_H
-
-#include "runtime/disk-io-mgr.h"
-#include "util/condition-variable.h"
-
-namespace impala {
-
-/// A request context is used to group together I/O requests belonging to a client of the
-/// I/O manager for management and scheduling. For most I/O manager clients it is an
-/// opaque pointer, but some clients may need to include this header, e.g. to make the
-/// unique_ptr<DiskIoRequestContext> destructor work correctly.
-///
-/// Implementation Details
-/// ======================
-/// This object maintains a lot of state that is carefully synchronized. The context
-/// maintains state across all disks as well as per disk state.
-/// The unit for an IO request is a RequestRange, which may be a ScanRange or a
-/// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
-/// and nothing has been read from it.
-/// 2) DiskIoRequestContext's ready_to_start_ranges_: This range is about to be started.
-/// As soon as the reader picks it up, it will move to the in_flight_ranges
-/// queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
-/// be read from the next time a disk thread picks it up in GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-/// anymore. We need the caller to pull a buffer off which will put this in
-/// the in_flight_ranges queue. These ranges are in the DiskIoRequestContext's
-/// blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, the
-/// transitions are: 1 -> 2 -> 3.
-/// If the scan range does get blocked, the transitions are
-/// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in cached_ranges_.
-/// When the caller asks for the next range to process, we first pull ranges from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
-/// done (ranges are either entirely cached or not at all). If the cached read attempt
-/// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread
-/// that picks it up in GetNextRequestRange().
-//
-/// AddWriteRange() adds WriteRanges for a disk.
-/// It is the responsibility of the client to pin the data to be written via a WriteRange
-/// in memory. After a WriteRange has been written, a callback is invoked to inform the
-/// client that the write has completed.
-//
-/// An important assumption is that write does not exceed the maximum read size and that
-/// the entire range is written when the write request is handled. (In other words, writes
-/// are not broken up.)
-//
-/// When a DiskIoRequestContext is processed by a disk thread in GetNextRequestRange(),
-/// a write range is always removed from the list of unstarted write ranges and appended
-/// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
-/// exists). And since at most one WriteRange can be present in in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
-/// behind at most one write range.
-class DiskIoRequestContext {
- using RequestRange = DiskIoMgr::RequestRange;
- using ScanRange = DiskIoMgr::ScanRange;
- using WriteRange = DiskIoMgr::WriteRange;
- using RequestType = DiskIoMgr::RequestType;
-
- public:
- ~DiskIoRequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
-
- private:
- DISALLOW_COPY_AND_ASSIGN(DiskIoRequestContext);
- friend class DiskIoMgr;
-
- class PerDiskState;
-
- enum State {
- /// Reader is initialized and maps to a client
- Active,
-
- /// Reader is in the process of being cancelled. Cancellation is coordinated between
- /// different threads and when they are all complete, the reader context is moved to
- /// the inactive state.
- Cancelled,
-
- /// Reader context does not map to a client. Accessing memory in this context
- /// is invalid (i.e. it is equivalent to a dangling pointer).
- Inactive,
- };
-
- DiskIoRequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
-
- /// Decrements the number of active disks for this reader. If the disk count
- /// goes to 0, the disk complete condition variable is signaled.
- /// Reader lock must be taken before this call.
- void DecrementDiskRefCount() {
- // boost doesn't let us dcheck that the reader lock is taken
- DCHECK_GT(num_disks_with_ranges_, 0);
- if (--num_disks_with_ranges_ == 0) {
- disks_complete_cond_var_.NotifyAll();
- }
- DCHECK(Validate()) << std::endl << DebugString();
- }
-
- /// Reader & Disk Scheduling: Readers that currently can't do work are not on
- /// the disk's queue. These readers are ones that don't have any ranges in the
- /// in_flight_queue AND have not prepared a range by setting next_range_to_start.
- /// The rule to make sure readers are scheduled correctly is to ensure anytime a
- /// range is put on the in_flight_queue or anytime next_range_to_start is set to
- /// NULL, the reader is scheduled.
-
- /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
- /// if necessary.
- /// Reader lock must be taken before this.
- void ScheduleScanRange(ScanRange* range) {
- DCHECK_EQ(state_, Active);
- DCHECK(range != NULL);
- DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()];
- state.in_flight_ranges()->Enqueue(range);
- state.ScheduleContext(this, range->disk_id());
- }
-
- /// Cancels the context with status code 'status'
- void Cancel(const Status& status);
-
- /// Cancel the context if not already cancelled, wait for all scan ranges to finish
- /// and mark the context as inactive, after which it cannot be used.
- void CancelAndMarkInactive();
-
- /// Adds request range to disk queue for this request context. Currently,
- /// schedule_immediately must be false is RequestRange is a write range.
- void AddRequestRange(RequestRange* range, bool schedule_immediately);
-
- /// Validates invariants of reader. Reader lock must be taken beforehand.
- bool Validate() const;
-
- /// Dumps out reader information. Lock should be taken by caller
- std::string DebugString() const;
-
- /// Parent object
- DiskIoMgr* const parent_;
-
- /// Memory used for this reader. This is unowned by this object.
- MemTracker* const mem_tracker_;
-
- /// Total bytes read for this reader
- RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
-
- /// Total time spent in hdfs reading
- RuntimeProfile::Counter* read_timer_ = nullptr;
-
- /// Number of active read threads
- RuntimeProfile::Counter* active_read_thread_counter_ = nullptr;
-
- /// Disk access bitmap. The counter's bit[i] is set if disk id i has been accessed.
- /// TODO: we can only support up to 64 disks with this bitmap but it lets us use a
- /// builtin atomic instruction. Probably good enough for now.
- RuntimeProfile::Counter* disks_accessed_bitmap_ = nullptr;
-
- /// Total number of bytes read locally, updated at end of each range scan
- AtomicInt64 bytes_read_local_{0};
-
- /// Total number of bytes read via short circuit read, updated at end of each range scan
- AtomicInt64 bytes_read_short_circuit_{0};
-
- /// Total number of bytes read from date node cache, updated at end of each range scan
- AtomicInt64 bytes_read_dn_cache_{0};
-
- /// Total number of bytes from remote reads that were expected to be local.
- AtomicInt64 unexpected_remote_bytes_{0};
-
- /// The number of buffers that have been returned to the reader (via GetNext) that the
- /// reader has not returned. Only included for debugging and diagnostics.
- AtomicInt32 num_buffers_in_reader_{0};
-
- /// The number of scan ranges that have been completed for this reader.
- AtomicInt32 num_finished_ranges_{0};
-
- /// The number of scan ranges that required a remote read, updated at the end of each
- /// range scan. Only used for diagnostics.
- AtomicInt32 num_remote_ranges_{0};
-
- /// The total number of scan ranges that have not been started. Only used for
- /// diagnostics. This is the sum of all unstarted_scan_ranges across all disks.
- AtomicInt32 num_unstarted_scan_ranges_{0};
-
- /// Total number of file handle opens where the file handle was present in the cache
- AtomicInt32 cached_file_handles_hit_count_{0};
-
- /// Total number of file handle opens where the file handle was not in the cache
- AtomicInt32 cached_file_handles_miss_count_{0};
-
- /// The number of buffers that are being used for this reader. This is the sum
- /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
- /// to be queued). This includes both IOMgr-allocated buffers and client-provided
- /// buffers.
- AtomicInt32 num_used_buffers_{0};
-
- /// The total number of ready buffers across all ranges. Ready buffers are buffers
- /// that have been read from disk but not retrieved by the caller.
- /// This is the sum of all queued buffers in all ranges for this reader context.
- AtomicInt32 num_ready_buffers_{0};
-
- /// All fields below are accessed by multiple threads and the lock needs to be
- /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
- /// are held simultaneously.
- boost::mutex lock_;
-
- /// Current state of the reader
- State state_ = Active;
-
- /// Status of this reader. Set to non-ok if cancelled.
- Status status_;
-
- /// The number of disks with scan ranges remaining (always equal to the sum of
- /// disks with ranges).
- int num_disks_with_ranges_ = 0;
-
- /// This is the list of ranges that are expected to be cached on the DN.
- /// When the reader asks for a new range (GetNextScanRange()), we first
- /// return ranges from this list.
- InternalQueue<ScanRange> cached_ranges_;
-
- /// A list of ranges that should be returned in subsequent calls to
- /// GetNextRange.
- /// There is a trade-off with when to populate this list. Populating it on
- /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()).
- /// Populating it preemptively means we make worse scheduling decisions.
- /// We currently populate one range per disk.
- /// TODO: think about this some more.
- InternalQueue<ScanRange> ready_to_start_ranges_;
- ConditionVariable ready_to_start_ranges_cv_; // used with lock_
-
- /// Ranges that are blocked due to back pressure on outgoing buffers.
- InternalQueue<ScanRange> blocked_ranges_;
-
- /// Condition variable for UnregisterContext() to wait for all disks to complete
- ConditionVariable disks_complete_cond_var_;
-
- /// Struct containing state per disk. See comments in the disk read loop on how
- /// they are used.
- class PerDiskState {
- public:
- bool done() const { return done_; }
- void set_done(bool b) { done_ = b; }
-
- int num_remaining_ranges() const { return num_remaining_ranges_; }
- int& num_remaining_ranges() { return num_remaining_ranges_; }
-
- ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
- void set_next_scan_range_to_start(ScanRange* range) {
- next_scan_range_to_start_ = range;
- }
-
- /// We need to have a memory barrier to prevent this load from being reordered
- /// with num_threads_in_op(), since these variables are set without the reader
- /// lock taken
- bool is_on_queue() const {
- bool b = is_on_queue_;
- __sync_synchronize();
- return b;
- }
-
- int num_threads_in_op() const {
- int v = num_threads_in_op_.Load();
- // TODO: determine whether this barrier is necessary for any callsites.
- AtomicUtil::MemoryBarrier();
- return v;
- }
-
- const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
- return &unstarted_scan_ranges_;
- }
- const InternalQueue<WriteRange>* unstarted_write_ranges() const {
- return &unstarted_write_ranges_;
- }
- const InternalQueue<RequestRange>* in_flight_ranges() const {
- return &in_flight_ranges_;
- }
-
- InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; }
- InternalQueue<WriteRange>* unstarted_write_ranges() {
- return &unstarted_write_ranges_;
- }
- InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
-
- /// Schedules the request context on this disk if it's not already on the queue.
- /// Context lock must be taken before this.
- void ScheduleContext(DiskIoRequestContext* context, int disk_id);
-
- /// Increment the ref count on reader. We need to track the number of threads per
- /// reader per disk that are in the unlocked hdfs read code section. This is updated
- /// by multiple threads without a lock so we need to use an atomic int.
- void IncrementRequestThreadAndDequeue() {
- num_threads_in_op_.Add(1);
- is_on_queue_ = false;
- }
-
- void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
-
- /// Decrement request thread count and do final cleanup if this is the last
- /// thread. RequestContext lock must be taken before this.
- void DecrementRequestThreadAndCheckDone(DiskIoRequestContext* context) {
- num_threads_in_op_.Add(-1); // Also acts as a barrier.
- if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
- // This thread is the last one for this reader on this disk, do final cleanup
- context->DecrementDiskRefCount();
- done_ = true;
- }
- }
-
- private:
- /// If true, this disk is all done for this request context, including any cleanup.
- /// If done is true, it means that this request must not be on this disk's queue
- /// *AND* there are no threads currently working on this context. To satisfy
- /// this, only the last thread (per disk) can set this to true.
- bool done_ = true;
-
- /// For each disk, keeps track if the context is on this disk's queue, indicating
- /// the disk must do some work for this context. The disk needs to do work in 4 cases:
- /// 1) in_flight_ranges is not empty, the disk needs to read for this reader.
- /// 2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
- /// read next.
- /// 3) the reader has been cancelled and this disk needs to participate in the
- /// cleanup.
- /// 4) A write range is added to queue.
- /// In general, we only want to put a context on the disk queue if there is something
- /// useful that can be done. If there's nothing useful, the disk queue will wake up
- /// and then remove the reader from the queue. Doing this causes thrashing of the
- /// threads.
- bool is_on_queue_ = false;
-
- /// For each disks, the number of request ranges that have not been fully read.
- /// In the non-cancellation path, this will hit 0, and done will be set to true
- /// by the disk thread. This is undefined in the cancellation path (the various
- /// threads notice by looking at the DiskIoRequestContext's state_).
- int num_remaining_ranges_ = 0;
-
- /// Queue of ranges that have not started being read. This list is exclusive
- /// with in_flight_ranges.
- InternalQueue<ScanRange> unstarted_scan_ranges_;
-
- /// Queue of pending IO requests for this disk in the order that they will be
- /// processed. A ScanRange is added to this queue when it is returned in
- /// GetNextRange(), or when it is added with schedule_immediately = true.
- /// A WriteRange is added to this queue from unstarted_write_ranges_ for each
- /// invocation of GetNextRequestRange() in WorkLoop().
- /// The size of this queue is always less than or equal to num_remaining_ranges.
- InternalQueue<RequestRange> in_flight_ranges_;
-
- /// The next range to start for this reader on this disk. Each disk (for each reader)
- /// picks the next range to start. The range is set here and also added to the
- /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO order,
- /// so the ranges from different disks are round-robined. When the range is pulled
- /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, so the disk
- /// knows to populate it again and add it to ready_to_start_ranges_ i.e. it is used
- /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to add another
- /// range to ready_to_start_ranges_.
- ScanRange* next_scan_range_to_start_ = nullptr;
-
- /// For each disk, the number of threads issuing the underlying read/write on behalf
- /// of this context. There are a few places where we release the context lock, do some
- /// work, and then grab the lock again. Because we don't hold the lock for the
- /// entire operation, we need this ref count to keep track of which thread should do
- /// final resource cleanup during cancellation.
- /// Only the thread that sees the count at 0 should do the final cleanup.
- AtomicInt32 num_threads_in_op_{0};
-
- /// Queue of write ranges to process for this disk. A write range is always added
- /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
- /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
- /// and writes. (Otherwise, since next_scan_range_to_start is set
- /// in GetNextRequestRange() whenever it is null, repeated calls to
- /// GetNextRequestRange() and GetNextRange() may result in only reads being processed)
- InternalQueue<WriteRange> unstarted_write_ranges_;
- };
-
- /// Per disk states to synchronize multiple disk threads accessing the same request
- /// context.
- std::vector<PerDiskState> disk_states_;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
deleted file mode 100644
index 7f0692e..0000000
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ /dev/null
@@ -1,591 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr.h"
-#include "runtime/disk-io-mgr-internal.h"
-#include "util/error-util.h"
-#include "util/hdfs-util.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
- "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
- "(assuming the HDFS client is configured to do so).");
-
-// TODO: Run perf tests and empirically settle on the most optimal default value for the
-// read buffer size. Currently setting it as 128k for the same reason as for S3, i.e.
-// due to JNI array allocation and memcpy overhead, 128k was emperically found to have the
-// least overhead.
-DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
- "reading from ADLS.");
-
-// Implementation of the ScanRange functionality. Each ScanRange contains a queue
-// of ready buffers. For each ScanRange, there is only a single producer and
-// consumer thread, i.e. only one disk thread will push to a scan range at
-// any time and only one thread will remove from the queue. This is to guarantee
-// that buffers are queued and read in file order.
-
-bool DiskIoMgr::ScanRange::EnqueueBuffer(
- const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
- DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
- {
- unique_lock<mutex> scan_range_lock(lock_);
- DCHECK(Validate()) << DebugString();
- DCHECK(!eosr_returned_);
- DCHECK(!eosr_queued_);
- if (is_cancelled_) {
- // Return the buffer, this range has been cancelled
- if (buffer->buffer_ != nullptr) {
- io_mgr_->num_buffers_in_readers_.Add(1);
- reader_->num_buffers_in_reader_.Add(1);
- }
- reader_->num_used_buffers_.Add(-1);
- io_mgr_->ReturnBuffer(move(buffer));
- return false;
- }
- reader_->num_ready_buffers_.Add(1);
- eosr_queued_ = buffer->eosr();
- ready_buffers_.emplace_back(move(buffer));
-
- DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
- blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
- }
-
- buffer_ready_cv_.NotifyOne();
-
- return blocked_on_queue_;
-}
-
-Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
- DCHECK(*buffer == nullptr);
- bool eosr;
- {
- unique_lock<mutex> scan_range_lock(lock_);
- if (eosr_returned_) return Status::OK();
- DCHECK(Validate()) << DebugString();
-
- while (ready_buffers_.empty() && !is_cancelled_) {
- buffer_ready_cv_.Wait(scan_range_lock);
- }
-
- if (is_cancelled_) {
- DCHECK(!status_.ok());
- return status_;
- }
-
- // Remove the first ready buffer from the queue and return it
- DCHECK(!ready_buffers_.empty());
- DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
- *buffer = move(ready_buffers_.front());
- ready_buffers_.pop_front();
- eosr_returned_ = (*buffer)->eosr();
- eosr = (*buffer)->eosr();
- }
-
- // Update tracking counters. The buffer has now moved from the IoMgr to the
- // caller.
- io_mgr_->num_buffers_in_readers_.Add(1);
- reader_->num_buffers_in_reader_.Add(1);
- reader_->num_ready_buffers_.Add(-1);
- reader_->num_used_buffers_.Add(-1);
- if (eosr) reader_->num_finished_ranges_.Add(1);
-
- Status status = (*buffer)->status_;
- if (!status.ok()) {
- io_mgr_->ReturnBuffer(move(*buffer));
- return status;
- }
-
- unique_lock<mutex> reader_lock(reader_->lock_);
-
- DCHECK(reader_->Validate()) << endl << reader_->DebugString();
- if (reader_->state_ == DiskIoRequestContext::Cancelled) {
- reader_->blocked_ranges_.Remove(this);
- Cancel(reader_->status_);
- io_mgr_->ReturnBuffer(move(*buffer));
- return status_;
- }
-
- {
- // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
- // may have been called after we released 'lock_' above so we need to re-check
- // whether the queue is full.
- unique_lock<mutex> scan_range_lock(lock_);
- if (blocked_on_queue_ && ready_buffers_.size() < SCAN_RANGE_READY_BUFFER_LIMIT
- && !eosr_queued_) {
- blocked_on_queue_ = false;
- // This scan range was blocked and is no longer, add it to the reader
- // queue again.
- reader_->blocked_ranges_.Remove(this);
- reader_->ScheduleScanRange(this);
- }
- }
- return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::Cancel(const Status& status) {
- // Cancelling a range that was never started, ignore.
- if (io_mgr_ == nullptr) return;
-
- DCHECK(!status.ok());
- {
- // Grab both locks to make sure that all working threads see is_cancelled_.
- unique_lock<mutex> scan_range_lock(lock_);
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- DCHECK(Validate()) << DebugString();
- if (is_cancelled_) return;
- is_cancelled_ = true;
- status_ = status;
- }
- buffer_ready_cv_.NotifyAll();
- CleanupQueuedBuffers();
-
- // For cached buffers, we can't close the range until the cached buffer is returned.
- // Close() is called from DiskIoMgr::ReturnBuffer().
- if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
-}
-
-void DiskIoMgr::ScanRange::CleanupQueuedBuffers() {
- DCHECK(is_cancelled_);
- io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
- reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
- reader_->num_used_buffers_.Add(-ready_buffers_.size());
- reader_->num_ready_buffers_.Add(-ready_buffers_.size());
-
- while (!ready_buffers_.empty()) {
- io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
- ready_buffers_.pop_front();
- }
-}
-
-string DiskIoMgr::ScanRange::DebugString() const {
- stringstream ss;
- ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
- << " len=" << len_ << " bytes_read=" << bytes_read_
- << " buffer_queue=" << ready_buffers_.size()
- << " hdfs_file=" << exclusive_hdfs_fh_;
- return ss.str();
-}
-
-bool DiskIoMgr::ScanRange::Validate() {
- if (bytes_read_ > len_) {
- LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
- << " bytes_read_=" << bytes_read_ << " len_=" << len_;
- return false;
- }
- if (eosr_returned_ && !eosr_queued_) {
- LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range"
- << " eosr_returned_=" << eosr_returned_
- << " eosr_queued_=" << eosr_queued_;
- return false;
- }
- return true;
-}
-
-DiskIoMgr::ScanRange::ScanRange()
- : RequestRange(RequestType::READ),
- num_remote_bytes_(0),
- external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
- mtime_(-1) {}
-
-DiskIoMgr::ScanRange::~ScanRange() {
- DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
- DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
- << "Cached buffer was not released.";
-}
-
-void DiskIoMgr::ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
- int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
- DCHECK(ready_buffers_.empty());
- DCHECK(file != nullptr);
- DCHECK_GE(len, 0);
- DCHECK_GE(offset, 0);
- DCHECK(buffer_opts.client_buffer_ == nullptr ||
- buffer_opts.client_buffer_len_ >= len_);
- fs_ = fs;
- file_ = file;
- len_ = len;
- offset_ = offset;
- disk_id_ = disk_id;
- try_cache_ = buffer_opts.try_cache_;
- mtime_ = buffer_opts.mtime_;
- expected_local_ = expected_local;
- num_remote_bytes_ = 0;
- meta_data_ = meta_data;
- if (buffer_opts.client_buffer_ != nullptr) {
- external_buffer_tag_ = ExternalBufferTag::CLIENT_BUFFER;
- client_buffer_.data = buffer_opts.client_buffer_;
- client_buffer_.len = buffer_opts.client_buffer_len_;
- } else {
- external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
- }
- io_mgr_ = nullptr;
- reader_ = nullptr;
- exclusive_hdfs_fh_ = nullptr;
-}
-
-void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext* reader) {
- DCHECK(exclusive_hdfs_fh_ == nullptr);
- DCHECK(local_file_ == nullptr);
- // Reader must provide MemTracker or a buffer.
- DCHECK(external_buffer_tag_ == ExternalBufferTag::CLIENT_BUFFER
- || reader->mem_tracker_ != nullptr);
- io_mgr_ = io_mgr;
- reader_ = reader;
- local_file_ = nullptr;
- exclusive_hdfs_fh_ = nullptr;
- bytes_read_ = 0;
- is_cancelled_ = false;
- eosr_queued_= false;
- eosr_returned_= false;
- blocked_on_queue_ = false;
- DCHECK(Validate()) << DebugString();
-}
-
-Status DiskIoMgr::ScanRange::Open(bool use_file_handle_cache) {
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- if (is_cancelled_) return Status::CANCELLED;
-
- if (fs_ != nullptr) {
- if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
- // With file handle caching, the scan range does not maintain its own
- // hdfs file handle. File handle caching is only used for local files,
- // so s3 and remote filesystems should obtain an exclusive file handle
- // for each scan range.
- if (use_file_handle_cache && expected_local_) return Status::OK();
- // Get a new exclusive file handle.
- exclusive_hdfs_fh_ = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
- mtime(), reader_, true);
- if (exclusive_hdfs_fh_ == nullptr) {
- return Status(TErrorCode::DISK_IO_ERROR,
- GetHdfsErrorMsg("Failed to open HDFS file ", file_));
- }
-
- if (hdfsSeek(fs_, exclusive_hdfs_fh_->file(), offset_) != 0) {
- // Destroy the file handle and remove it from the cache.
- io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
- exclusive_hdfs_fh_ = nullptr;
- return Status(TErrorCode::DISK_IO_ERROR,
- Substitute("Error seeking to $0 in file: $1 $2", offset_, file_,
- GetHdfsErrorMsg("")));
- }
- } else {
- if (local_file_ != nullptr) return Status::OK();
-
- local_file_ = fopen(file(), "r");
- if (local_file_ == nullptr) {
- return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not open file: $0: $1",
- file_, GetStrErrMsg()));
- }
- if (fseek(local_file_, offset_, SEEK_SET) == -1) {
- fclose(local_file_);
- local_file_ = nullptr;
- return Status(TErrorCode::DISK_IO_ERROR, Substitute("Could not seek to $0 "
- "for file: $1: $2", offset_, file_, GetStrErrMsg()));
- }
- }
- if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
- }
- return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::Close() {
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- bool closed_file = false;
- if (fs_ != nullptr) {
- if (exclusive_hdfs_fh_ != nullptr) {
- GetHdfsStatistics(exclusive_hdfs_fh_->file());
-
- if (external_buffer_tag_ == ExternalBufferTag::CACHED_BUFFER) {
- hadoopRzBufferFree(exclusive_hdfs_fh_->file(), cached_buffer_);
- cached_buffer_ = nullptr;
- external_buffer_tag_ = ExternalBufferTag::NO_BUFFER;
- }
-
- // Destroy the file handle and remove it from the cache.
- io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), exclusive_hdfs_fh_, true);
- exclusive_hdfs_fh_ = nullptr;
- closed_file = true;
- }
-
- if (FLAGS_use_hdfs_pread) {
- // Update Hedged Read Metrics.
- // We call it only if the --use_hdfs_pread flag is set, to avoid having the
- // libhdfs client malloc and free a hdfsHedgedReadMetrics object unnecessarily
- // otherwise. 'hedged_metrics' is only set upon success.
- struct hdfsHedgedReadMetrics* hedged_metrics;
- int success = hdfsGetHedgedReadMetrics(fs_, &hedged_metrics);
- if (success == 0) {
- ImpaladMetrics::HEDGED_READ_OPS->set_value(hedged_metrics->hedgedReadOps);
- ImpaladMetrics::HEDGED_READ_OPS_WIN->set_value(hedged_metrics->hedgedReadOpsWin);
- hdfsFreeHedgedReadMetrics(hedged_metrics);
- }
- }
-
- if (num_remote_bytes_ > 0) {
- reader_->num_remote_ranges_.Add(1L);
- if (expected_local_) {
- reader_->unexpected_remote_bytes_.Add(num_remote_bytes_);
- VLOG_FILE << "Unexpected remote HDFS read of "
- << PrettyPrinter::Print(num_remote_bytes_, TUnit::BYTES)
- << " for file '" << file_ << "'";
- }
- }
- } else {
- if (local_file_ == nullptr) return;
- fclose(local_file_);
- local_file_ = nullptr;
- closed_file = true;
- }
- if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
- ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
- }
-}
-
-int64_t DiskIoMgr::ScanRange::MaxReadChunkSize() const {
- // S3 InputStreams don't support DIRECT_READ (i.e. java.nio.ByteBuffer read()
- // interface). So, hdfsRead() needs to allocate a Java byte[] and copy the data out.
- // Profiles show that both the JNI array allocation and the memcpy adds much more
- // overhead for larger buffers, so limit the size of each read request. 128K was
- // chosen empirically by trying values between 4K and 8M and optimizing for lower CPU
- // utilization and higher S3 througput.
- if (disk_id_ == io_mgr_->RemoteS3DiskId()) {
- DCHECK(IsS3APath(file()));
- return 128 * 1024;
- }
- if (disk_id_ == io_mgr_->RemoteAdlsDiskId()) {
- DCHECK(IsADLSPath(file()));
- return FLAGS_adls_read_chunk_size;
- }
- // The length argument of hdfsRead() is an int. Ensure we don't overflow it.
- return numeric_limits<int>::max();
-}
-
-// TODO: how do we best use the disk here. e.g. is it good to break up a
-// 1MB read into 8 128K reads?
-// TODO: look at linux disk scheduling
-Status DiskIoMgr::ScanRange::Read(
- uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- if (is_cancelled_) return Status::CANCELLED;
-
- *eosr = false;
- *bytes_read = 0;
- // Read until the end of the scan range or the end of the buffer.
- int bytes_to_read = min(len_ - bytes_read_, buffer_len);
- DCHECK_GE(bytes_to_read, 0);
-
- if (fs_ != nullptr) {
- HdfsFileHandle* borrowed_hdfs_fh = nullptr;
- hdfsFile hdfs_file;
-
- // If the scan range has an exclusive file handle, use it. Otherwise, borrow
- // a file handle from the cache.
- if (exclusive_hdfs_fh_ != nullptr) {
- hdfs_file = exclusive_hdfs_fh_->file();
- } else {
- borrowed_hdfs_fh = io_mgr_->GetCachedHdfsFileHandle(fs_, file_string(),
- mtime(), reader_, false);
- if (borrowed_hdfs_fh == nullptr) {
- return Status(TErrorCode::DISK_IO_ERROR,
- GetHdfsErrorMsg("Failed to open HDFS file ", file_));
- }
- hdfs_file = borrowed_hdfs_fh->file();
- }
-
- int64_t max_chunk_size = MaxReadChunkSize();
- Status status = Status::OK();
- while (*bytes_read < bytes_to_read) {
- int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
- DCHECK_GE(chunk_size, 0);
- // The hdfsRead() length argument is an int.
- DCHECK_LE(chunk_size, numeric_limits<int>::max());
- int current_bytes_read = -1;
- // bytes_read_ is only updated after the while loop
- int64_t position_in_file = offset_ + bytes_read_ + *bytes_read;
- int num_retries = 0;
- while (true) {
- status = Status::OK();
- // For file handles from the cache, any of the below file operations may fail
- // due to a bad file handle. In each case, record the error, but allow for a
- // retry to fix it.
- if (FLAGS_use_hdfs_pread) {
- current_bytes_read = hdfsPread(fs_, hdfs_file, position_in_file,
- buffer + *bytes_read, chunk_size);
- if (current_bytes_read == -1) {
- status = Status(TErrorCode::DISK_IO_ERROR,
- GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
- }
- } else {
- // If the file handle is borrowed, it may not be at the appropriate
- // location. Seek to the appropriate location.
- bool seek_failed = false;
- if (borrowed_hdfs_fh != nullptr) {
- if (hdfsSeek(fs_, hdfs_file, position_in_file) != 0) {
- status = Status(TErrorCode::DISK_IO_ERROR, Substitute("Error seeking to $0 "
- " in file: $1: $2", position_in_file, file_, GetHdfsErrorMsg("")));
- seek_failed = true;
- }
- }
- if (!seek_failed) {
- current_bytes_read = hdfsRead(fs_, hdfs_file, buffer + *bytes_read,
- chunk_size);
- if (current_bytes_read == -1) {
- status = Status(TErrorCode::DISK_IO_ERROR,
- GetHdfsErrorMsg("Error reading from HDFS file: ", file_));
- }
- }
- }
-
- // Do not retry:
- // - if read was successful (current_bytes_read != -1)
- // - or if already retried once
- // - or if this not using a borrowed file handle
- DCHECK_LE(num_retries, 1);
- if (current_bytes_read != -1 || borrowed_hdfs_fh == nullptr ||
- num_retries == 1) {
- break;
- }
- // The error may be due to a bad file handle. Reopen the file handle and retry.
- ++num_retries;
- RETURN_IF_ERROR(io_mgr_->ReopenCachedHdfsFileHandle(fs_, file_string(),
- mtime(), &borrowed_hdfs_fh));
- hdfs_file = borrowed_hdfs_fh->file();
- }
- if (!status.ok()) break;
- if (current_bytes_read == 0) {
- // No more bytes in the file. The scan range went past the end.
- *eosr = true;
- break;
- }
- *bytes_read += current_bytes_read;
-
- // Collect and accumulate statistics
- GetHdfsStatistics(hdfs_file);
- }
-
- if (borrowed_hdfs_fh != nullptr) {
- io_mgr_->ReleaseCachedHdfsFileHandle(file_string(), borrowed_hdfs_fh, false);
- }
- if (!status.ok()) return status;
- } else {
- DCHECK(local_file_ != nullptr);
- *bytes_read = fread(buffer, 1, bytes_to_read, local_file_);
- DCHECK_GE(*bytes_read, 0);
- DCHECK_LE(*bytes_read, bytes_to_read);
- if (*bytes_read < bytes_to_read) {
- if (ferror(local_file_) != 0) {
- return Status(TErrorCode::DISK_IO_ERROR, Substitute("Error reading from $0"
- "at byte offset: $1: $2", file_, offset_ + bytes_read_, GetStrErrMsg()));
- } else {
- // On Linux, we should only get partial reads from block devices on error or eof.
- DCHECK(feof(local_file_) != 0);
- *eosr = true;
- }
- }
- }
- bytes_read_ += *bytes_read;
- DCHECK_LE(bytes_read_, len_);
- if (bytes_read_ == len_) *eosr = true;
- return Status::OK();
-}
-
-Status DiskIoMgr::ScanRange::ReadFromCache(
- const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
- DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
- DCHECK(try_cache_);
- DCHECK_EQ(bytes_read_, 0);
- *read_succeeded = false;
- Status status = Open(false);
- if (!status.ok()) return status;
-
- // Cached reads not supported on local filesystem.
- if (fs_ == nullptr) return Status::OK();
-
- {
- unique_lock<mutex> hdfs_lock(hdfs_lock_);
- if (is_cancelled_) return Status::CANCELLED;
-
- DCHECK(exclusive_hdfs_fh_ != nullptr);
- DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
- cached_buffer_ =
- hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options_, len());
- if (cached_buffer_ != nullptr) {
- external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
- }
- }
- // Data was not cached, caller will fall back to normal read path.
- if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) {
- VLOG_QUERY << "Cache read failed for scan range: " << DebugString()
- << ". Switching to disk read path.";
- // Clean up the scan range state before re-issuing it.
- Close();
- return Status::OK();
- }
-
- // Cached read returned a buffer, verify we read the correct amount of data.
- void* buffer = const_cast<void*>(hadoopRzBufferGet(cached_buffer_));
- int32_t bytes_read = hadoopRzBufferLength(cached_buffer_);
- // A partial read can happen when files are truncated.
- // TODO: If HDFS ever supports partially cached blocks, we'll have to distinguish
- // between errors and partially cached blocks here.
- if (bytes_read < len()) {
- VLOG_QUERY << "Error reading file from HDFS cache: " << file_ << ". Expected "
- << len() << " bytes, but read " << bytes_read << ". Switching to disk read path.";
- // Close the scan range. 'read_succeeded' is still false, so the caller will fall back
- // to non-cached read of this scan range.
- Close();
- return Status::OK();
- }
-
- // Create a single buffer desc for the entire scan range and enqueue that.
- // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
- // not the Impala backend.
- unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
- io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
- desc->len_ = bytes_read;
- desc->scan_range_offset_ = 0;
- desc->eosr_ = true;
- bytes_read_ = bytes_read;
- EnqueueBuffer(reader_lock, move(desc));
- if (reader_->bytes_read_counter_ != nullptr) {
- COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
- }
- *read_succeeded = true;
- reader_->num_used_buffers_.Add(1);
- return Status::OK();
-}
-
-void DiskIoMgr::ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) {
- struct hdfsReadStatistics* stats;
- if (IsHdfsPath(file())) {
- int success = hdfsFileGetReadStatistics(hdfs_file, &stats);
- if (success == 0) {
- reader_->bytes_read_local_.Add(stats->totalLocalBytesRead);
- reader_->bytes_read_short_circuit_.Add(stats->totalShortCircuitBytesRead);
- reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead);
- if (stats->totalLocalBytesRead != stats->totalBytesRead) {
- num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead;
- }
- hdfsFileFreeReadStatistics(stats);
- }
- hdfsFileClearReadStatistics(hdfs_file);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress-test.cc b/be/src/runtime/disk-io-mgr-stress-test.cc
deleted file mode 100644
index 7ae9515..0000000
--- a/be/src/runtime/disk-io-mgr-stress-test.cc
+++ /dev/null
@@ -1,60 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/disk-io-mgr-stress.h"
-#include "util/cpu-info.h"
-#include "util/string-parser.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-// Simple utility to run the disk io stress test. A optional second parameter
-// can be passed to control how long to run this test (0 for forever).
-
-// TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
-const int NUM_DISKS = 5;
-const int NUM_THREADS_PER_DISK = 5;
-const int NUM_CLIENTS = 10;
-const bool TEST_CANCELLATION = true;
-
-int main(int argc, char** argv) {
- google::InitGoogleLogging(argv[0]);
- CpuInfo::Init();
- OsInfo::Init();
- impala::InitThreading();
- int duration_sec = DEFAULT_DURATION_SEC;
-
- if (argc == 2) {
- StringParser::ParseResult status;
- duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
- if (status != StringParser::PARSE_SUCCESS) {
- printf("Invalid arg: %s\n", argv[1]);
- return 1;
- }
- }
- if (duration_sec != 0) {
- printf("Running stress test for %d seconds.\n", duration_sec);
- } else {
- printf("Running stress test indefinitely.\n");
- }
- DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
- test.Run(duration_sec);
-
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
deleted file mode 100644
index a98c3a4..0000000
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ /dev/null
@@ -1,246 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/thread/mutex.hpp>
-
-#include "runtime/disk-io-mgr-stress.h"
-
-#include "runtime/disk-io-mgr-reader-context.h"
-#include "util/time.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
-
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
-
-// Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
-
-static const int CANCEL_READER_PERIOD_MS = 20; // in ms
-
-static void CreateTempFile(const char* filename, const char* data) {
- FILE* file = fopen(filename, "w");
- CHECK(file != NULL);
- fwrite(data, 1, strlen(data), file);
- fclose(file);
-}
-
-string GenerateRandomData() {
- int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
- stringstream ss;
- for (int i = 0; i < rand_len; ++i) {
- char c = rand() % 26 + 'a';
- ss << c;
- }
- return ss.str();
-}
-
-struct DiskIoMgrStress::Client {
- boost::mutex lock;
- unique_ptr<DiskIoRequestContext> reader;
- int file_idx;
- vector<DiskIoMgr::ScanRange*> scan_ranges;
- int abort_at_byte;
- int files_processed;
-};
-
-DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
- int num_clients, bool includes_cancellation) :
- num_clients_(num_clients),
- includes_cancellation_(includes_cancellation) {
-
- time_t rand_seed = time(NULL);
- LOG(INFO) << "Running with rand seed: " << rand_seed;
- srand(rand_seed);
-
- io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
- MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
- Status status = io_mgr_->Init(&mem_tracker_);
- CHECK(status.ok());
-
- // Initialize some data files. It doesn't really matter how many there are.
- files_.resize(num_clients * 2);
- for (int i = 0; i < files_.size(); ++i) {
- stringstream ss;
- ss << "/tmp/disk_io_mgr_stress_file" << i;
- files_[i].filename = ss.str();
- files_[i].data = GenerateRandomData();
- CreateTempFile(files_[i].filename.c_str(), files_[i].data.c_str());
- }
-
- clients_ = new Client[num_clients_];
- client_mem_trackers_.resize(num_clients_);
- for (int i = 0; i < num_clients_; ++i) {
- NewClient(i);
- }
-}
-
-void DiskIoMgrStress::ClientThread(int client_id) {
- Client* client = &clients_[client_id];
- Status status;
- char read_buffer[MAX_FILE_LEN];
-
- while (!shutdown_) {
- bool eos = false;
- int bytes_read = 0;
-
- const string& expected = files_[client->file_idx].data;
-
- while (!eos) {
- DiskIoMgr::ScanRange* range;
- Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
- CHECK(status.ok() || status.IsCancelled());
- if (range == NULL) break;
-
- while (true) {
- unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
- status = range->GetNext(&buffer);
- CHECK(status.ok() || status.IsCancelled());
- if (buffer == NULL) break;
-
- int64_t scan_range_offset = buffer->scan_range_offset();
- int len = buffer->len();
- CHECK_GE(scan_range_offset, 0);
- CHECK_LT(scan_range_offset, expected.size());
- CHECK_GT(len, 0);
-
- // We get scan ranges back in arbitrary order so the scan range to the file
- // offset.
- int64_t file_offset = scan_range_offset + range->offset();
-
- // Validate the bytes read
- CHECK_LE(file_offset + len, expected.size());
- CHECK_EQ(strncmp(reinterpret_cast<char*>(buffer->buffer()),
- &expected.c_str()[file_offset], len), 0);
-
- // Copy the bytes from this read into the result buffer.
- memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
- io_mgr_->ReturnBuffer(move(buffer));
- bytes_read += len;
-
- CHECK_GE(bytes_read, 0);
- CHECK_LE(bytes_read, expected.size());
-
- if (bytes_read > client->abort_at_byte) {
- eos = true;
- break;
- }
- } // End of buffer
- } // End of scan range
-
- if (bytes_read == expected.size()) {
- // This entire file was read without being cancelled, validate the entire result
- CHECK(status.ok());
- CHECK_EQ(strncmp(read_buffer, expected.c_str(), bytes_read), 0);
- }
-
- // Unregister the old client and get a new one
- unique_lock<mutex> lock(client->lock);
- io_mgr_->UnregisterContext(client->reader.get());
- NewClient(client_id);
- }
-
- unique_lock<mutex> lock(client->lock);
- io_mgr_->UnregisterContext(client->reader.get());
- client->reader = NULL;
-}
-
-// Cancel a random reader
-void DiskIoMgrStress::CancelRandomReader() {
- if (!includes_cancellation_) return;
-
- int rand_client = rand() % num_clients_;
-
- unique_lock<mutex> lock(clients_[rand_client].lock);
- io_mgr_->CancelContext(clients_[rand_client].reader.get());
-}
-
-void DiskIoMgrStress::Run(int sec) {
- shutdown_ = false;
- for (int i = 0; i < num_clients_; ++i) {
- readers_.add_thread(
- new thread(&DiskIoMgrStress::ClientThread, this, i));
- }
-
- // Sleep and let the clients do their thing for 'sec'
- for (int loop_count = 1; sec == 0 || loop_count <= sec; ++loop_count) {
- int iter = (1000) / CANCEL_READER_PERIOD_MS;
- for (int i = 0; i < iter; ++i) {
- SleepForMs(CANCEL_READER_PERIOD_MS);
- CancelRandomReader();
- }
- LOG(ERROR) << "Finished iteration: " << loop_count;
- }
-
- // Signal shutdown for the client threads
- shutdown_ = true;
-
- for (int i = 0; i < num_clients_; ++i) {
- unique_lock<mutex> lock(clients_[i].lock);
- if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
- }
-
- readers_.join_all();
-}
-
-// Initialize a client to read one of the files at random. The scan ranges are
-// assigned randomly.
-void DiskIoMgrStress::NewClient(int i) {
- Client& client = clients_[i];
- ++client.files_processed;
- client.file_idx = rand() % files_.size();
- int file_len = files_[client.file_idx].data.size();
-
- client.abort_at_byte = file_len;
-
- if (includes_cancellation_) {
- float rand_value = rand() / (float)RAND_MAX;
- if (rand_value < ABORT_CHANCE) {
- // Abort at a random byte inside the file
- client.abort_at_byte = rand() % file_len;
- }
- }
-
- for (int i = 0; i < client.scan_ranges.size(); ++i) {
- delete client.scan_ranges[i];
- }
- client.scan_ranges.clear();
-
- int assigned_len = 0;
- while (assigned_len < file_len) {
- int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
- range_len = min(range_len, file_len - assigned_len);
-
- DiskIoMgr::ScanRange* range = new DiskIoMgr::ScanRange();
- range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
- 0, false, DiskIoMgr::BufferOpts::Uncached());
- client.scan_ranges.push_back(range);
- assigned_len += range_len;
- }
-
- client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
- client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
- Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
- CHECK(status.ok());
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.h b/be/src/runtime/disk-io-mgr-stress.h
deleted file mode 100644
index 0a66f2c..0000000
--- a/be/src/runtime/disk-io-mgr-stress.h
+++ /dev/null
@@ -1,94 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
-#define IMPALA_RUNTIME_DISK_IO_MGR_STRESS_H
-
-#include <memory>
-#include <vector>
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread/thread.hpp>
-
-#include "runtime/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
-
-namespace impala {
-
-/// Test utility to stress the disk io mgr. It allows for a configurable
-/// number of clients. The clients continuously issue work to the io mgr and
-/// asynchronously get cancelled. The stress test can be run forever or for
-/// a fixed duration. The unit test runs this for a fixed duration.
-class DiskIoMgrStress {
- public:
- DiskIoMgrStress(int num_disks, int num_threads_per_disk, int num_clients,
- bool includes_cancellation);
-
- /// Run the test for 'sec'. If 0, run forever
- void Run(int sec);
-
- private:
- struct Client;
-
- struct File {
- std::string filename;
- std::string data; // the data in the file, used to validate
- };
-
-
- /// Files used for testing. These are created at startup and recycled
- /// during the test
- std::vector<File> files_;
-
- /// Root mem tracker.
- MemTracker mem_tracker_;
-
- /// io manager
- boost::scoped_ptr<DiskIoMgr> io_mgr_;
-
- /// Thread group for reader threads
- boost::thread_group readers_;
-
- /// Array of clients
- int num_clients_;
- Client* clients_;
-
- /// Client MemTrackers, one per client.
- std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
-
- /// If true, tests cancelling readers
- bool includes_cancellation_;
-
- /// Flag to signal that client reader threads should exit
- volatile bool shutdown_;
-
- /// Helper to initialize a new reader client, registering a new reader with the
- /// io mgr and initializing the scan ranges
- void NewClient(int i);
-
- /// Thread running the reader. When the current reader is done (either normally
- /// or cancelled), it picks up a new reader
- void ClientThread(int client_id);
-
- /// Possibly cancels a random reader.
- void CancelRandomReader();
-};
-
-}
-
-#endif