You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/18 00:31:51 UTC
[08/16] incubator-impala git commit: IMPALA-4835 (prep only): create
io subfolder and namespace
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index f1e243c..24217de 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -26,7 +26,8 @@
#include <gutil/strings/join.h>
#include <gutil/strings/substitute.h>
-#include "runtime/disk-io-mgr-reader-context.h"
+#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
#include "runtime/runtime-state.h"
#include "runtime/tmp-file-mgr-internal.h"
#include "util/bit-util.h"
@@ -52,6 +53,7 @@ using boost::algorithm::token_compress_on;
using boost::filesystem::absolute;
using boost::filesystem::path;
using boost::uuids::random_generator;
+using namespace impala::io;
using namespace strings;
namespace impala {
@@ -358,7 +360,7 @@ Status TmpFileMgr::FileGroup::Write(
unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb));
WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda.
- DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
+ WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
RETURN_IF_ERROR(
tmp_handle->Write(io_mgr_, io_ctx_.get(), tmp_file, file_offset, buffer, callback));
@@ -387,11 +389,11 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
DCHECK(handle->write_range_ != nullptr);
// Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
// since the write is not in flight.
- handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+ handle->read_range_ = scan_range_pool_.Add(new ScanRange);
handle->read_range_->Reset(nullptr, handle->write_range_->file(),
handle->write_range_->len(), handle->write_range_->offset(),
handle->write_range_->disk_id(), false,
- DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
+ BufferOpts::ReadInto(buffer.data(), buffer.len()));
read_counter_->Add(1);
bytes_read_counter_->Add(buffer.len());
RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true));
@@ -403,7 +405,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf
// Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
// since the write is not in flight.
SCOPED_TIMER(disk_read_timer_);
- unique_ptr<DiskIoMgr::BufferDescriptor> io_mgr_buffer;
+ unique_ptr<BufferDescriptor> io_mgr_buffer;
Status status = handle->read_range_->GetNext(&io_mgr_buffer);
if (!status.ok()) goto exit;
DCHECK(io_mgr_buffer != NULL);
@@ -525,9 +527,9 @@ string TmpFileMgr::WriteHandle::TmpFilePath() const {
return file_->path();
}
-Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx,
+Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx,
File* file, int64_t offset, MemRange buffer,
- DiskIoMgr::WriteRange::WriteDoneCallback callback) {
+ WriteRange::WriteDoneCallback callback) {
DCHECK(!write_in_flight_);
if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
@@ -536,7 +538,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* i
// WriteComplete() may be called concurrently with the remainder of this function.
file_ = file;
write_range_.reset(
- new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
+ new WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
write_range_->SetData(buffer.data(), buffer.len());
write_in_flight_ = true;
Status status = io_mgr->AddWriteRange(io_ctx, write_range_.get());
@@ -553,7 +555,7 @@ Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* i
}
Status TmpFileMgr::WriteHandle::RetryWrite(
- DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) {
+ DiskIoMgr* io_mgr, RequestContext* io_ctx, File* file, int64_t offset) {
DCHECK(write_in_flight_);
file_ = file;
write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index f550af2..95072ae 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,10 +28,10 @@
#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
-#include "runtime/disk-io-mgr.h"
-#include "util/mem-range.h"
+#include "runtime/io/request-ranges.h"
#include "util/collection-metrics.h"
#include "util/condition-variable.h"
+#include "util/mem-range.h"
#include "util/openssl-util.h"
#include "util/runtime-profile.h"
#include "util/spinlock.h"
@@ -100,7 +100,7 @@ class TmpFileMgr {
/// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
/// names. It is an error to create multiple FileGroups with the same 'unique_id'.
/// 'bytes_limit' is the limit on the total file space to allocate.
- FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile,
+ FileGroup(TmpFileMgr* tmp_file_mgr, io::DiskIoMgr* io_mgr, RuntimeProfile* profile,
const TUniqueId& unique_id, int64_t bytes_limit = -1);
~FileGroup();
@@ -198,10 +198,10 @@ class TmpFileMgr {
TmpFileMgr* const tmp_file_mgr_;
/// DiskIoMgr used for all I/O to temporary files.
- DiskIoMgr* const io_mgr_;
+ io::DiskIoMgr* const io_mgr_;
/// I/O context used for all reads and writes. Registered in constructor.
- std::unique_ptr<DiskIoRequestContext> io_ctx_;
+ std::unique_ptr<io::RequestContext> io_ctx_;
/// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
/// touched by DiskIoMgr even after the scan is finished.
@@ -303,14 +303,14 @@ class TmpFileMgr {
/// Starts a write of 'buffer' to 'offset' of 'file'. 'write_in_flight_' must be false
/// before calling. After returning, 'write_in_flight_' is true on success or false on
/// failure and 'is_cancelled_' is set to true on failure.
- Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+ Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
int64_t offset, MemRange buffer,
- DiskIoMgr::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+ io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
/// Retry the write after the initial write failed with an error, instead writing to
/// 'offset' of 'file'. 'write_in_flight_' must be true before calling.
/// After returning, 'write_in_flight_' is true on success or false on failure.
- Status RetryWrite(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+ Status RetryWrite(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
int64_t offset) WARN_UNUSED_RESULT;
/// Called when the write has completed successfully or not. Sets 'write_in_flight_'
@@ -340,7 +340,7 @@ class TmpFileMgr {
RuntimeProfile::Counter* encryption_timer_;
/// The DiskIoMgr write range for this write.
- boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_;
+ boost::scoped_ptr<io::WriteRange> write_range_;
/// The temporary file being written to.
File* file_;
@@ -355,7 +355,7 @@ class TmpFileMgr {
/// The scan range for the read that is currently in flight. NULL when no read is in
/// flight.
- DiskIoMgr::ScanRange* read_range_;
+ io::ScanRange* read_range_;
/// Protects all fields below while 'write_in_flight_' is true. At other times, it is
/// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,