You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/01/05 15:32:16 UTC
[3/5] incubator-impala git commit: IMPALA-3202,
IMPALA-2079: rework scratch file I/O
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index dc35600..bf2b7ec 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -15,16 +15,19 @@
// specific language governing permissions and limitations
// under the License.
+#include "runtime/tmp-file-mgr.h"
+
#include <boost/algorithm/string.hpp>
+#include <boost/filesystem.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/thread/locks.hpp>
-#include <boost/uuid/uuid_io.hpp>
#include <boost/uuid/random_generator.hpp>
-#include <boost/filesystem.hpp>
-#include <gutil/strings/substitute.h>
+#include <boost/uuid/uuid_io.hpp>
#include <gutil/strings/join.h>
+#include <gutil/strings/substitute.h>
-#include "runtime/tmp-file-mgr.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tmp-file-mgr-internal.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/filesystem-util.h"
@@ -32,9 +35,13 @@
#include "common/names.h"
+DEFINE_bool(disk_spill_encryption, false,
+ "Set this to encrypt and perform an integrity "
+ "check on all data spilled to disk during a query");
DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
-
-#include "common/names.h"
+DEFINE_bool(allow_multiple_scratch_dirs_per_device, false,
+ "If false and --scratch_dirs contains multiple directories on the same device, "
+ "then only the first writable directory is used");
using boost::algorithm::is_any_of;
using boost::algorithm::join;
@@ -55,8 +62,10 @@ const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dir
const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST =
"tmp-file-mgr.active-scratch-dirs.list";
-TmpFileMgr::TmpFileMgr() : initialized_(false), dir_status_lock_(), tmp_dirs_(),
- num_active_scratch_dirs_metric_(NULL), active_scratch_dirs_metric_(NULL) {}
+TmpFileMgr::TmpFileMgr()
+ : initialized_(false),
+ num_active_scratch_dirs_metric_(nullptr),
+ active_scratch_dirs_metric_(nullptr) {}
Status TmpFileMgr::Init(MetricGroup* metrics) {
string tmp_dirs_spec = FLAGS_scratch_dirs;
@@ -65,7 +74,7 @@ Status TmpFileMgr::Init(MetricGroup* metrics) {
if (!tmp_dirs_spec.empty()) {
split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
}
- return InitCustom(all_tmp_dirs, true, metrics);
+ return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
}
Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device,
@@ -108,7 +117,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
<< "disk " << disk_id;
- tmp_dirs_.push_back(Dir(scratch_subdir_path.string(), false));
+ tmp_dirs_.push_back(scratch_subdir_path.string());
} else {
LOG(WARNING) << "Could not remove and recreate directory "
<< scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -117,14 +126,14 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
}
}
- DCHECK(metrics != NULL);
+ DCHECK(metrics != nullptr);
num_active_scratch_dirs_metric_ =
metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
- active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(metrics,
- TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
+ active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
+ metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
for (int i = 0; i < tmp_dirs_.size(); ++i) {
- active_scratch_dirs_metric_->Add(tmp_dirs_[i].path());
+ active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
}
initialized_ = true;
@@ -137,24 +146,20 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
return Status::OK();
}
-Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id,
- const TUniqueId& query_id, unique_ptr<File>* new_file) {
+Status TmpFileMgr::NewFile(
+ FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) {
DCHECK(initialized_);
DCHECK_GE(device_id, 0);
DCHECK_LT(device_id, tmp_dirs_.size());
- DCHECK(file_group != NULL);
- if (IsBlacklisted(device_id)) {
- return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, tmp_dirs_[device_id].path());
- }
-
+ DCHECK(file_group != nullptr);
// Generate the full file path.
string unique_name = lexical_cast<string>(random_generator()());
stringstream file_name;
- file_name << PrintId(query_id) << "_" << unique_name;
- path new_file_path(tmp_dirs_[device_id].path());
+ file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
+ path new_file_path(tmp_dirs_[device_id]);
new_file_path /= file_name.str();
- new_file->reset(new File(this, file_group, device_id, new_file_path.string()));
+ new_file->reset(new File(file_group, device_id, new_file_path.string()));
return Status::OK();
}
@@ -162,162 +167,133 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
DCHECK(initialized_);
DCHECK_GE(device_id, 0);
DCHECK_LT(device_id, tmp_dirs_.size());
- return tmp_dirs_[device_id].path();
+ return tmp_dirs_[device_id];
}
-void TmpFileMgr::BlacklistDevice(DeviceId device_id) {
+int TmpFileMgr::NumActiveTmpDevices() {
DCHECK(initialized_);
- DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
- bool added;
- {
- lock_guard<SpinLock> l(dir_status_lock_);
- added = tmp_dirs_[device_id].blacklist();
- }
- if (added) {
- num_active_scratch_dirs_metric_->Increment(-1);
- active_scratch_dirs_metric_->Remove(tmp_dirs_[device_id].path());
- }
+ return tmp_dirs_.size();
}
-bool TmpFileMgr::IsBlacklisted(DeviceId device_id) {
- DCHECK(initialized_);
- DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
- lock_guard<SpinLock> l(dir_status_lock_);
- return tmp_dirs_[device_id].is_blacklisted();
-}
-
-int TmpFileMgr::num_active_tmp_devices() {
- DCHECK(initialized_);
- lock_guard<SpinLock> l(dir_status_lock_);
- int num_active = 0;
- for (int device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
- if (!tmp_dirs_[device_id].is_blacklisted()) ++num_active;
- }
- return num_active;
-}
-
-vector<TmpFileMgr::DeviceId> TmpFileMgr::active_tmp_devices() {
+vector<TmpFileMgr::DeviceId> TmpFileMgr::ActiveTmpDevices() {
vector<TmpFileMgr::DeviceId> devices;
- // Allocate vector before we grab lock
- devices.reserve(tmp_dirs_.size());
- {
- lock_guard<SpinLock> l(dir_status_lock_);
- for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
- if (!tmp_dirs_[device_id].is_blacklisted()) {
- devices.push_back(device_id);
- }
- }
+ for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
+ devices.push_back(device_id);
}
return devices;
}
-TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
- const string& path)
- : mgr_(mgr),
- file_group_(file_group),
+TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& path)
+ : file_group_(file_group),
path_(path),
device_id_(device_id),
- current_size_(0),
+ disk_id_(DiskInfo::disk_id(path.c_str())),
+ bytes_allocated_(0),
blacklisted_(false) {
- DCHECK(file_group != NULL);
+ DCHECK(file_group != nullptr);
}
Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
DCHECK_GT(num_bytes, 0);
- Status status;
- if (mgr_->IsBlacklisted(device_id_)) {
- blacklisted_ = true;
- return Status(TErrorCode::TMP_FILE_BLACKLISTED, path_);
- }
- if (current_size_ == 0) {
- // First call to AllocateSpace. Create the file.
- status = FileSystemUtil::CreateFile(path_);
- if (!status.ok()) {
- ReportIOError(status.msg());
- return status;
- }
- disk_id_ = DiskInfo::disk_id(path_.c_str());
- }
- int64_t new_size = current_size_ + num_bytes;
- status = FileSystemUtil::ResizeFile(path_, new_size);
- if (!status.ok()) {
- ReportIOError(status.msg());
- return status;
- }
- *offset = current_size_;
- current_size_ = new_size;
+ *offset = bytes_allocated_;
+ bytes_allocated_ += num_bytes;
return Status::OK();
}
-void TmpFileMgr::File::ReportIOError(const ErrorMsg& msg) {
+int TmpFileMgr::File::AssignDiskQueue() const {
+ return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
+}
+
+void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
- // IMPALA-2305: avoid blacklisting to prevent test failures.
- // blacklisted_ = true;
- // mgr_->BlacklistDevice(device_id_);
+ blacklisted_ = true;
}
Status TmpFileMgr::File::Remove() {
- if (current_size_ > 0) FileSystemUtil::RemovePaths(vector<string>(1, path_));
+ // Remove the file if present (it may not be present if no writes completed).
+ FileSystemUtil::RemovePaths({path_});
return Status::OK();
}
-TmpFileMgr::FileGroup::FileGroup(
- TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit)
+string TmpFileMgr::File::DebugString() {
+ return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 "
+ "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_,
+ blacklisted_);
+}
+
+TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
+ RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size,
+ int64_t bytes_limit)
: tmp_file_mgr_(tmp_file_mgr),
- current_bytes_allocated_(0),
+ io_mgr_(io_mgr),
+ io_ctx_(nullptr),
+ unique_id_(unique_id),
+ block_size_(block_size),
bytes_limit_(bytes_limit),
+ write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)),
+ bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)),
+ read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
+ bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
+ scratch_space_bytes_used_counter_(
+ ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
+ disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
+ encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")),
+ current_bytes_allocated_(0),
next_allocation_index_(0) {
- DCHECK(tmp_file_mgr != NULL);
- scratch_space_bytes_used_counter_ =
- ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES);
+ DCHECK_GT(block_size_, 0);
+ DCHECK(tmp_file_mgr != nullptr);
+ io_mgr_->RegisterContext(&io_ctx_, nullptr);
}
-Status TmpFileMgr::FileGroup::CreateFiles(const TUniqueId& query_id) {
+TmpFileMgr::FileGroup::~FileGroup() {
+ DCHECK_EQ(tmp_files_.size(), 0);
+}
+
+Status TmpFileMgr::FileGroup::CreateFiles() {
+ lock_.DCheckLocked();
DCHECK(tmp_files_.empty());
- vector<Status> errs;
- vector<DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices();
+ vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices();
int files_allocated = 0;
// Initialize the tmp files and the initial file to use.
for (int i = 0; i < tmp_devices.size(); ++i) {
- TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i];
+ TmpFileMgr::DeviceId device_id = tmp_devices[i];
// It is possible for a device to be blacklisted after it was returned by
- // active_tmp_devices(), handle this gracefully by skipping devices if NewFile()
+ // ActiveTmpDevices(), handle this gracefully by skipping devices if NewFile()
// fails.
- Status status = NewFile(tmp_device_id, query_id);
+ unique_ptr<TmpFileMgr::File> tmp_file;
+ Status status = tmp_file_mgr_->NewFile(this, device_id, &tmp_file);
if (status.ok()) {
+ tmp_files_.emplace_back(std::move(tmp_file));
++files_allocated;
} else {
- errs.push_back(std::move(status));
+ scratch_errors_.push_back(std::move(status));
}
}
DCHECK_EQ(tmp_files_.size(), files_allocated);
if (tmp_files_.size() == 0) {
- Status err_status("Could not create files in any configured scratch directories "
- "(--scratch_dirs).");
- for (Status& err : errs) err_status.MergeStatus(err);
+ // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+ // so we must point users to the impalad error log.
+ Status err_status(
+ "Could not create files in any configured scratch directories (--scratch_dirs). "
+ "See logs for previous errors that may have caused this.");
+ for (Status& err : scratch_errors_) err_status.MergeStatus(err);
return err_status;
}
-
// Start allocating on a random device to avoid overloading the first device.
next_allocation_index_ = rand() % tmp_files_.size();
return Status::OK();
}
-Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id,
- const TUniqueId& query_id, File** new_file) {
- unique_ptr<TmpFileMgr::File> tmp_file;
- RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, &tmp_file));
- if (new_file != NULL) *new_file = tmp_file.get();
- tmp_files_.emplace_back(std::move(tmp_file));
- return Status::OK();
-}
-
void TmpFileMgr::FileGroup::Close() {
- for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) {
+ // Cancel writes before deleting the files, since in-flight writes could re-create
+ // deleted files.
+ if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_);
+ io_ctx_ = nullptr;
+ for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
Status status = file->Remove();
if (!status.ok()) {
- LOG(WARNING) << "Error removing scratch file '" << file->path() << "': "
- << status.msg().msg();
+ LOG(WARNING) << "Error removing scratch file '" << file->path()
+ << "': " << status.msg().msg();
}
}
tmp_files_.clear();
@@ -325,18 +301,31 @@ void TmpFileMgr::FileGroup::Close() {
Status TmpFileMgr::FileGroup::AllocateSpace(
int64_t num_bytes, File** tmp_file, int64_t* file_offset) {
- if (bytes_limit_ != -1 && current_bytes_allocated_ + num_bytes > bytes_limit_) {
+ DCHECK_LE(num_bytes, block_size_);
+ lock_guard<SpinLock> lock(lock_);
+
+ if (!free_ranges_.empty()) {
+ *tmp_file = free_ranges_.back().first;
+ *file_offset = free_ranges_.back().second;
+ free_ranges_.pop_back();
+ return Status::OK();
+ }
+
+ if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > bytes_limit_) {
return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_);
}
- vector<Status> errs;
+
+ // Lazily create the files on the first write.
+ if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles());
+
// Find the next physical file in round-robin order and allocate a range from it.
for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
*tmp_file = tmp_files_[next_allocation_index_].get();
next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
if ((*tmp_file)->is_blacklisted()) continue;
- Status status = (*tmp_file)->AllocateSpace(num_bytes, file_offset);
+ Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset);
if (status.ok()) {
- scratch_space_bytes_used_counter_->Add(num_bytes);
+ scratch_space_bytes_used_counter_->Add(block_size_);
current_bytes_allocated_ += num_bytes;
return Status::OK();
}
@@ -345,12 +334,261 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
LOG(WARNING) << "Error while allocating range in scratch file '"
<< (*tmp_file)->path() << "': " << status.msg().msg()
<< ". Will try another scratch file.";
- errs.push_back(status);
+ scratch_errors_.push_back(status);
}
- Status err_status("No usable scratch files: space could not be allocated in any "
- "of the configured scratch directories (--scratch_dirs).");
- for (Status& err : errs) err_status.MergeStatus(err);
+ // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+ // so we must point users to the impalad error log.
+ Status err_status(
+ "No usable scratch files: space could not be allocated in any of "
+ "the configured scratch directories (--scratch_dirs). See logs for previous "
+ "errors that may have caused this.");
+ // Include all previous errors that may have caused the failure.
+ for (Status& err : scratch_errors_) err_status.MergeStatus(err);
return err_status;
}
+void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) {
+ lock_guard<SpinLock> lock(lock_);
+ free_ranges_.emplace_back(file, offset);
+}
+
+Status TmpFileMgr::FileGroup::Write(
+ MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpFileMgr::WriteHandle>* handle) {
+ DCHECK_GE(buffer.len(), 0);
+
+ File* tmp_file;
+ int64_t file_offset;
+ RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset));
+
+ unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb));
+ WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda.
+ DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
+ const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
+ RETURN_IF_ERROR(
+ tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback));
+ write_counter_->Add(1);
+ bytes_written_counter_->Add(buffer.len());
+ *handle = move(tmp_handle);
+ return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
+ DCHECK(handle->write_range_ != nullptr);
+ DCHECK(!handle->is_cancelled_);
+ DCHECK_EQ(buffer.len(), handle->len());
+
+ // Don't grab 'lock_' in this method - it is not necessary because we don't touch
+ // any members that it protects and could block other threads for the duration of
+ // the synchronous read.
+ DCHECK(!handle->write_in_flight_);
+ DCHECK(handle->write_range_ != nullptr);
+ // Don't grab handle->lock_, it is safe to touch all of handle's state since the
+ // write is not in flight.
+ DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+ scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
+ handle->write_range_->offset(), handle->write_range_->disk_id(), false,
+ DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
+ DiskIoMgr::BufferDescriptor* io_mgr_buffer;
+ {
+ SCOPED_TIMER(disk_read_timer_);
+ read_counter_->Add(1);
+ bytes_read_counter_->Add(buffer.len());
+ RETURN_IF_ERROR(io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer));
+ }
+
+ if (FLAGS_disk_spill_encryption) {
+ RETURN_IF_ERROR(handle->CheckHashAndDecrypt(buffer));
+ }
+
+ DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+ DCHECK_EQ(io_mgr_buffer->len(), buffer.len());
+ DCHECK(io_mgr_buffer->eosr());
+ io_mgr_buffer->Return();
+ return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
+ unique_ptr<WriteHandle> handle, MemRange buffer) {
+ DCHECK_EQ(handle->write_range_->data(), buffer.data());
+ DCHECK_EQ(handle->len(), buffer.len());
+ handle->Cancel();
+
+ // Decrypt regardless of whether the write is still in flight or not. An in-flight
+ // write may write bogus data to disk but this lets us get some work done while the
+ // write is being cancelled.
+ Status status;
+ if (FLAGS_disk_spill_encryption) {
+ status = handle->CheckHashAndDecrypt(buffer);
+ }
+ handle->WaitForWrite();
+ AddFreeRange(handle->file_, handle->write_range_->offset());
+ handle.reset();
+ return status;
+}
+
+void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) {
+ handle->Cancel();
+ handle->WaitForWrite();
+ AddFreeRange(handle->file_, handle->write_range_->offset());
+ handle.reset();
+}
+
+void TmpFileMgr::FileGroup::WriteComplete(
+ WriteHandle* handle, const Status& write_status) {
+ Status status;
+ if (!write_status.ok()) {
+ status = RecoverWriteError(handle, write_status);
+ if (status.ok()) return;
+ } else {
+ status = write_status;
+ }
+ handle->WriteComplete(status);
+}
+
+Status TmpFileMgr::FileGroup::RecoverWriteError(
+ WriteHandle* handle, const Status& write_status) {
+ DCHECK(!write_status.ok());
+ DCHECK(handle->file_ != nullptr);
+
+ // We can't recover from cancellation or memory limit exceeded.
+ if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) {
+ return write_status;
+ }
+
+ // Save and report the error before retrying so that the failure isn't silent.
+ {
+ lock_guard<SpinLock> lock(lock_);
+ scratch_errors_.push_back(write_status);
+ }
+ handle->file_->Blacklist(write_status.msg());
+
+ // Do not retry cancelled writes or propagate the error, simply return CANCELLED.
+ if (handle->is_cancelled_) return Status::CANCELLED;
+
+ TmpFileMgr::File* tmp_file;
+ int64_t file_offset;
+ // Discard the scratch file range - we will not reuse ranges from a bad file.
+ // Choose another file to try. Blacklisting ensures we don't retry the same file.
+ // If this fails, the status will include all the errors in 'scratch_errors_'.
+ RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset));
+ return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset);
+}
+
+string TmpFileMgr::FileGroup::DebugString() {
+ lock_guard<SpinLock> lock(lock_);
+ stringstream ss;
+ ss << "FileGroup " << this << " block size " << block_size_
+ << " bytes limit " << bytes_limit_
+ << " current bytes allocated " << current_bytes_allocated_
+ << " next allocation index " << next_allocation_index_
+ << " writes " << write_counter_->value()
+ << " bytes written " << bytes_written_counter_->value()
+ << " reads " << read_counter_->value()
+ << " bytes read " << bytes_read_counter_->value()
+ << " scratch bytes used " << scratch_space_bytes_used_counter_
+ << " dist read timer " << disk_read_timer_->value()
+ << " encryption timer " << encryption_timer_->value() << endl
+ << " " << tmp_files_.size() << " files:" << endl;
+ for (unique_ptr<File>& file : tmp_files_) {
+ ss << " " << file->DebugString() << endl;
+ }
+ return ss.str();
+}
+
+TmpFileMgr::WriteHandle::WriteHandle(
+ RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb)
+ : cb_(cb),
+ encryption_timer_(encryption_timer),
+ file_(nullptr),
+ is_cancelled_(false),
+ write_in_flight_(false) {}
+
+string TmpFileMgr::WriteHandle::TmpFilePath() const {
+ if (file_ == nullptr) return "";
+ return file_->path();
+}
+
+Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx,
+ File* file, int64_t offset, MemRange buffer,
+ DiskIoMgr::WriteRange::WriteDoneCallback callback) {
+ DCHECK(!write_in_flight_);
+
+ if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
+
+ file_ = file;
+ write_in_flight_ = true;
+ write_range_.reset(
+ new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
+ write_range_->SetData(buffer.data(), buffer.len());
+ return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+Status TmpFileMgr::WriteHandle::RetryWrite(
+ DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) {
+ DCHECK(write_in_flight_);
+ file_ = file;
+ write_in_flight_ = true;
+ write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
+ return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {
+ WriteDoneCallback cb;
+ {
+ lock_guard<mutex> lock(write_state_lock_);
+ DCHECK(write_in_flight_);
+ write_in_flight_ = false;
+ // Need to extract 'cb_' because once 'write_in_flight_' is false, the WriteHandle
+ // may be destroyed.
+ cb = move(cb_);
+ }
+ write_complete_cv_.NotifyAll();
+ // Call 'cb' once we've updated the state. We must do this last because once 'cb' is
+ // called, it is valid to call Read() on the handle.
+ cb(write_status);
+}
+
+void TmpFileMgr::WriteHandle::Cancel() {
+ unique_lock<mutex> lock(write_state_lock_);
+ is_cancelled_ = true;
+ // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here.
+}
+
+void TmpFileMgr::WriteHandle::WaitForWrite() {
+ unique_lock<mutex> lock(write_state_lock_);
+ while (write_in_flight_) write_complete_cv_.Wait(lock);
+}
+
+Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) {
+ DCHECK(FLAGS_disk_spill_encryption);
+ SCOPED_TIMER(encryption_timer_);
+ // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair.
+ // Regenerate a new key and IV for every data buffer we write.
+ key_.InitializeRandom();
+ RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data()));
+ hash_.Compute(buffer.data(), buffer.len());
+ return Status::OK();
+}
+
+Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) {
+ DCHECK(FLAGS_disk_spill_encryption);
+ SCOPED_TIMER(encryption_timer_);
+ if (!hash_.Verify(buffer.data(), buffer.len())) {
+ return Status("Block verification failure");
+ }
+ return key_.Decrypt(buffer.data(), buffer.len(), buffer.data());
+}
+
+string TmpFileMgr::WriteHandle::DebugString() {
+ unique_lock<mutex> lock(write_state_lock_);
+ stringstream ss;
+ ss << "Write handle " << this << " file '" << file_->path() << "'"
+ << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_;
+ if (write_range_ != NULL) {
+ ss << " data " << write_range_->data() << " len " << write_range_->len()
+ << " file offset " << write_range_->offset()
+ << " disk id " << write_range_->disk_id();
+ }
+ return ss.str();
+}
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 3c489b2..0c3e974 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -18,146 +18,211 @@
#ifndef IMPALA_RUNTIME_TMP_FILE_MGR_H
#define IMPALA_RUNTIME_TMP_FILE_MGR_H
+#include <functional>
+#include <utility>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/object-pool.h"
#include "common/status.h"
#include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/disk-io-mgr.h"
+#include "util/mem-range.h"
#include "util/collection-metrics.h"
+#include "util/condition-variable.h"
+#include "util/openssl-util.h"
#include "util/runtime-profile.h"
#include "util/spinlock.h"
namespace impala {
-/// TmpFileMgr creates and manages temporary files and directories on the local
-/// filesystem. It can manage multiple temporary directories across multiple devices.
-/// TmpFileMgr ensures that at most one directory per device is used unless overridden
-/// for testing.
+/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
+/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
+/// directories across multiple devices, configured via the --scratch_dirs option.
+/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which
+/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr.
+/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption
+/// command-line flag.
+///
+/// FileGroups manage scratch space across multiple devices. To write to scratch space,
+/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously
+/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a
+/// WriteHandle, which is used by the caller to identify that write operation. The
+/// caller is notified when the asynchronous write completes via a callback, after which
+/// the caller can use the WriteHandle to read back the data.
///
-/// Every temporary File belongs to a FileGroup: to allocate temporary files, first a
-/// FileGroup is created, then FileGroup::NewFile() is called to create a new File with
-/// a unique filename on the specified temporary device. The client can use the File
-/// handle to allocate space in the file. FileGroups can be created with a limit on
-/// the total number of bytes allocated across all files in the group.
+/// Each WriteHandle is backed by a range of data in a scratch file. The first call to
+/// Write() will create files for the FileGroup with unique filenames on the configured
+/// temporary devices. At most one directory per device is used (unless overridden for
+/// testing). Free space is managed within a FileGroup: once a WriteHandle is destroyed,
+/// the file range backing it can be recycled for a different WriteHandle. The file range
+/// of a WriteHandle can be replaced with a different one if a write error is encountered
+/// and the data instead needs to be written to a different disk.
///
-/// TODO: we could notify block managers about the failure so they can more take
-/// proactive action to avoid using the device.
+/// Resource Management:
+/// TmpFileMgr provides some basic support for managing local disk space consumption.
+/// A FileGroup can be created with a limit on the total number of bytes allocated across
+/// all files. Writes that would exceed the limit fail with an error status.
+///
+/// TODO: each FileGroup can manage only fixed length scratch file ranges of 'block_size',
+/// to simplify the recycling logic. BufferPool will require variable length ranges.
+/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
+/// temporarily blacklist devices that show I/O errors.
class TmpFileMgr {
public:
- class FileGroup;
+ class File; // Needs to be public for TmpFileMgrTest.
+ class WriteHandle;
- /// DeviceId is a unique identifier for a temporary device managed by TmpFileMgr.
- /// It is used as a handle for external classes to identify devices.
+ /// DeviceId is an internal unique identifier for a temporary device managed by
+ /// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily.
+ /// Needs to be public for TmpFileMgrTest.
typedef int DeviceId;
- /// File is a handle to a physical file in a temporary directory. Clients
- /// can allocate file space and remove files using AllocateSpace() and Remove().
- /// Creation of the file is deferred until the first call to AllocateSpace().
- class File {
+ typedef std::function<void(const Status&)> WriteDoneCallback;
+
+ /// Represents a group of temporary files - one per disk with a scratch directory. The
+ /// total allocated bytes of the group can be bound by setting the space allocation
+ /// limit. The owner of the FileGroup object is responsible for calling the Close()
+ /// method to delete all the files in the group.
+ ///
+ /// Public methods of FileGroup and WriteHandle are safe to call concurrently from
+ /// multiple threads as long as different WriteHandle arguments are provided.
+ class FileGroup {
public:
- /// Called to notify TmpFileMgr that an IO error was encountered for this file
- void ReportIOError(const ErrorMsg& msg);
+ /// Initialize a new file group, which will create files using 'tmp_file_mgr'
+ /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch
+ /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
+ /// names. It is an error to create multiple FileGroups with the same 'unique_id'.
+ /// 'block_size' is the size of blocks in bytes that space will be allocated in.
+ /// 'bytes_limit' is the limit on the total file space to allocate.
+ FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile,
+ const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = -1);
+
+ ~FileGroup();
+
+ /// Asynchronously writes 'buffer' to a temporary file of this file group. If there
+ /// are multiple scratch files, this can write to any of them, and will attempt to
+ /// recover from I/O errors on one file by writing to a different file. The memory
+ /// referenced by 'buffer' must remain valid until the write completes. The callee
+ /// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or
+ /// compression). The caller should not modify the data in 'buffer' until the write
+ /// completes or is cancelled, otherwise invalid data may be written to disk.
+ ///
+ /// TODO: buffer->len must be <= 'block_size' until FileGroup supports allocating
+ /// variable-length scratch files ranges.
+ ///
+ /// Returns an error if the scratch space cannot be allocated or the write cannot
+ /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
+ /// a different thread when the write completes successfully or unsuccessfully or is
+ /// cancelled.
+ ///
+ /// 'handle' must be destroyed by passing the DestroyWriteHandle() or
+ /// CancelWriteAndRestoreData().
+ Status Write(
+ MemRange buffer, WriteDoneCallback cb, std::unique_ptr<WriteHandle>* handle);
+
+ /// Synchronously read the data referenced by 'handle' from the temporary file into
+ /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
+ /// after a write successfully completes.
+ Status Read(WriteHandle* handle, MemRange buffer);
+
+ /// Cancels the write referenced by 'handle' and destroy associate resources. Also
+ /// restore the original data in the 'buffer' passed to Write(), decrypting or
+ /// decompressing as necessary. The cancellation always succeeds, but an error
+ /// is returned if restoring the data fails.
+ Status CancelWriteAndRestoreData(
+ std::unique_ptr<WriteHandle> handle, MemRange buffer);
+
+ /// Wait for the in-flight I/Os to complete and destroy resources associated with
+ /// 'handle'.
+ void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle);
+
+ /// Calls Remove() on all the files in the group and deletes them.
+ void Close();
+
+ std::string DebugString();
- const std::string& path() const { return path_; }
- int disk_id() const { return disk_id_; }
- bool is_blacklisted() const { return blacklisted_; }
+ const TUniqueId& unique_id() const { return unique_id_; }
private:
- friend class FileGroup;
- friend class TmpFileMgr;
+ friend class File;
friend class TmpFileMgrTest;
- /// Allocates 'num_bytes' bytes in this file for a new block of data.
- /// The file size is increased by a call to truncate() if necessary.
- /// The physical file is created on the first call to AllocateSpace().
- /// Returns Status::OK() and sets offset on success.
- /// Returns an error status if an unexpected error occurs, e.g. the file could not
- /// be created.
- Status AllocateSpace(int64_t num_bytes, int64_t* offset);
-
- /// Delete the physical file on disk, if one was created.
- /// It is not valid to read or write to a file after calling Remove().
- Status Remove();
+ /// Initializes the file group with one temporary file per disk with a scratch
+ /// directory. Returns OK if at least one temporary file could be created.
+ /// Returns an error if no temporary files were successfully created. Must only be
+ /// called once. Must be called with 'lock_' held.
+ Status CreateFiles();
- /// The name of the sub-directory that Impala created within each configured scratch
- /// directory.
- const static std::string TMP_SUB_DIR_NAME;
+ /// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error
+ /// occurs. Returns an error only if no temporary files are usable or the scratch
+ /// limit is exceeded. Must be called without 'lock_' held.
+ Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
- /// Space (in MB) that must ideally be available for writing on a scratch
- /// directory. A warning is issued if available space is less than this threshold.
- const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
+ /// Add a free scratch range to 'free_ranges_'. Must be called without 'lock_' held.
+ void AddFreeRange(File* file, int64_t offset);
- File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
- const std::string& path);
+ /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
+ /// to retry the write. On success or if the write can't be retried, calls
+ /// handle->WriteComplete().
+ void WriteComplete(WriteHandle* handle, const Status& write_status);
- /// TmpFileMgr this belongs to.
- TmpFileMgr* mgr_;
+ /// Handles a write error. Logs the write error and blacklists the device for this
+ /// file group if the cause was an I/O error. Blacklisting limits the number of times
+ /// a write is retried because each device will only be tried once. Returns OK if it
+ /// successfully reissued the write. Returns an error status if the original error
+ /// was unrecoverable or an unrecoverable error is encountered when reissuing the
+ /// write. The error status will include all previous I/O errors in its details.
+ Status RecoverWriteError(WriteHandle* handle, const Status& write_status);
- /// The FileGroup this belongs to. Cannot be null.
- FileGroup* file_group_;
+ /// The TmpFileMgr it is associated with.
+ TmpFileMgr* const tmp_file_mgr_;
- /// Path of the physical file in the filesystem.
- std::string path_;
+ /// DiskIoMgr used for all I/O to temporary files.
+ DiskIoMgr* const io_mgr_;
- /// The temporary device this file is stored on.
- DeviceId device_id_;
+ /// I/O context used for all reads and writes. Registered in constructor.
+ DiskIoRequestContext* io_ctx_;
- /// The id of the disk on which the physical file lies.
- int disk_id_;
+ /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
+ /// touched by DiskIoMgr even after the scan is finished.
+ /// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined.
+ ObjectPool scan_range_pool_;
- /// Current file size. Modified by AllocateSpace(). Size is 0 before file creation.
- int64_t current_size_;
+ /// Unique across all FileGroups. Used to prefix file names.
+ const TUniqueId unique_id_;
- /// Set to true to indicate that file can't be expanded. This is useful to keep here
- /// even though it is redundant with the global per-device blacklisting in TmpFileMgr
- /// because it can be checked without acquiring a global lock. If a file is
- /// blacklisted, the corresponding device will always be blacklisted.
- bool blacklisted_;
- };
+ /// Size of the blocks in bytes that scratch space is managed in.
+ /// TODO: support variable-length scratch file ranges.
+ const int64_t block_size_;
- /// Represents a group of temporary files - one per disk with a scratch directory. The
- /// total allocated bytes of the group can be bound by setting the space allocation
- /// limit. The owner of the FileGroup object is responsible for calling the Close()
- /// method to delete all the files in the group.
- class FileGroup {
- public:
- /// Initialize a new file group, which will create files using 'tmp_file_mgr'.
- /// Adds counters to 'profile' to track scratch space used. 'bytes_limit' is
- /// the limit on the total file space to allocate.
- FileGroup(
- TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit = -1);
+ /// Max write space allowed (-1 means no limit).
+ const int64_t bytes_limit_;
- ~FileGroup() { DCHECK_EQ(NumFiles(), 0); }
+ /// Number of write operations (includes writes started but not yet complete).
+ RuntimeProfile::Counter* const write_counter_;
- /// Initializes the file group with one temporary file per disk with a scratch
- /// directory. 'unique_id' is a unique ID that should be used to prefix any
- /// scratch file names. It is an error to create multiple FileGroups with the
- /// same 'unique_id'. Returns OK if at least one temporary file could be created.
- /// Returns an error if no temporary files were successfully created. Must only be
- /// called once.
- Status CreateFiles(const TUniqueId& unique_id);
+ /// Number of bytes written to disk (includes writes started but not yet complete).
+ RuntimeProfile::Counter* const bytes_written_counter_;
- /// Allocate num_bytes bytes in a temporary file. Try multiple disks if error occurs.
- /// Returns an error only if no temporary files are usable or the scratch limit is
- /// exceeded.
- Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
+ /// Number of read operations (includes reads started but not yet complete).
+ RuntimeProfile::Counter* const read_counter_;
- /// Calls Remove() on all the files in the group and deletes them.
- void Close();
+ /// Number of bytes read from disk (includes reads started but not yet complete).
+ RuntimeProfile::Counter* const bytes_read_counter_;
- /// Returns the number of files that are a part of the group.
- int NumFiles() { return tmp_files_.size(); }
+ /// Amount of scratch space allocated in bytes.
+ RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
- private:
- friend class TmpFileMgrTest;
+ /// Time taken for disk reads.
+ RuntimeProfile::Counter* const disk_read_timer_;
- /// Creates a new File with a unique path for a query instance, adds it to the
- /// group and returns a handle for that file. The file path is within the (single)
- /// tmp directory on the specified device id.
- /// If an error is encountered, e.g. the device is blacklisted, the file is not
- /// added to this group and a non-ok status is returned.
- Status NewFile(
- const DeviceId& device_id, const TUniqueId& unique_id, File** new_file = NULL);
+ /// Time spent in disk spill encryption, decryption, and integrity checking.
+ RuntimeProfile::Counter* encryption_timer_;
- /// The TmpFileMgr it is associated with.
- TmpFileMgr* tmp_file_mgr_;
+ /// Protects below members.
+ SpinLock lock_;
/// List of files representing the FileGroup.
std::vector<std::unique_ptr<File>> tmp_files_;
@@ -165,16 +230,117 @@ class TmpFileMgr {
/// Total space allocated in this group's files.
int64_t current_bytes_allocated_;
- /// Max write space allowed (-1 means no limit).
- const int64_t bytes_limit_;
-
/// Index into 'tmp_files' denoting the file to which the next temporary file range
/// should be allocated from. Used to implement round-robin allocation from temporary
/// files.
int next_allocation_index_;
- /// Amount of scratch space allocated in bytes.
- RuntimeProfile::Counter* scratch_space_bytes_used_counter_;
+ /// List of File/offset pairs for free scratch ranges of size 'block_size_' bytes.
+ std::vector<std::pair<File*, int64_t>> free_ranges_;
+
+ /// Errors encountered when creating/writing scratch files. We store the history so
+ /// that we can report the original cause of the scratch errors if we run out of
+ /// devices to write to.
+ std::vector<Status> scratch_errors_;
+ };
+
+ /// A handle to a write operation, backed by a range of a temporary file. The operation
+ /// is either in-flight or has completed. If it completed with no error and wasn't
+ /// cancelled then the data is in the file and can be read back.
+ ///
+ /// WriteHandle is returned from FileGroup::Write(). After the write completes, the
+ /// handle can be passed to FileGroup::Read() to read back the data zero or more times.
+ /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
+ /// allow reuse of the scratch file range written to. Alternatively,
+ /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of
+ /// FileGroup::Write() by destroying the handle and restoring the original data to the
+ /// buffer, so long as the data in the buffer was not modified by the caller.
+ ///
+ /// Public methods of WriteHandle are safe to call concurrently from multiple threads.
+ class WriteHandle {
+ public:
+ // The write must be destroyed by FileGroup::DestroyWriteHandle().
+ ~WriteHandle() {
+ DCHECK(!write_in_flight_);
+ DCHECK(is_cancelled_);
+ }
+
+ /// Path of temporary file backing the block. Intended for use in testing.
+ /// Returns empty string if no backing file allocated.
+ std::string TmpFilePath() const;
+
+ /// The length of the write range in bytes.
+ int64_t len() const { return write_range_->len(); }
+
+ std::string DebugString();
+
+ private:
+ friend class FileGroup;
+
+ WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb);
+
+ /// Starts a write of 'buffer' to 'offset' of 'file'.
+ Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+ int64_t offset, MemRange buffer,
+ DiskIoMgr::WriteRange::WriteDoneCallback callback);
+
+ /// Retry the write after the initial write failed with an error, instead writing to
+ /// 'offset' of 'file'.
+ Status RetryWrite(
+ DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset);
+
+ /// Cancels the write asynchronously. After Cancel() is called, writes are not
+ /// retried.
+ void Cancel();
+
+ /// Blocks until the write completes either successfully or unsuccessfully.
+ void WaitForWrite();
+
+ /// Called when the write has completed successfully or not. Sets 'write_in_flight_'
+ /// then calls 'cb_'.
+ void WriteComplete(const Status& write_status);
+
+ /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
+ Status EncryptAndHash(MemRange buffer);
+
+ /// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
+ Status CheckHashAndDecrypt(MemRange buffer);
+
+ /// Callback to be called when the write completes.
+ WriteDoneCallback cb_;
+
+ /// Reference to the FileGroup's 'encryption_timer_'.
+ RuntimeProfile::Counter* encryption_timer_;
+
+ /// The DiskIoMgr write range for this write.
+ boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_;
+
+ /// The temporary file being written to.
+ File* file_;
+
+ /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
+ /// Regenerated for each write.
+ EncryptionKey key_;
+
+ /// If --disk_spill_encryption is on, our hash of the data being written. Filled in
+ /// on writes; verified on reads. This is calculated _after_ encryption.
+ IntegrityHash hash_;
+
+ /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
+ /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
+ /// so no locking is required. This is a terminal lock and should not be held while
+ /// acquiring other locks or invoking 'cb_'.
+ boost::mutex write_state_lock_;
+
+ // True if the the write has been cancelled (but is not necessarily complete).
+ bool is_cancelled_;
+
+ // True if a write is in flight.
+ bool write_in_flight_;
+
+ /// Signalled when the write completes and 'write_in_flight_' becomes false, before
+ /// 'cb_' is invoked.
+ ConditionVariable write_complete_cv_;
};
TmpFileMgr();
@@ -194,59 +360,27 @@ class TmpFileMgr {
/// Total number of devices with tmp directories that are active. There is one tmp
/// directory per device.
- int num_active_tmp_devices();
+ int NumActiveTmpDevices();
/// Return vector with device ids of all tmp devices being actively used.
/// I.e. those that haven't been blacklisted.
- std::vector<DeviceId> active_tmp_devices();
+ std::vector<DeviceId> ActiveTmpDevices();
private:
- /// Return a new File handle with a unique path for a query instance. The file is
- /// associated with the file_group and the file path is within the (single) tmp
+ friend class TmpFileMgrTest;
+
+ /// Return a new File handle with a path based on file_group->unique_id. The file is
+ /// associated with the 'file_group' and the file path is within the (single) scratch
/// directory on the specified device id. The caller owns the returned handle and is
/// responsible for deleting it. The file is not created - creation is deferred until
- /// the first call to File::AllocateSpace().
- Status NewFile(FileGroup* file_group, const DeviceId& device_id,
- const TUniqueId& unique_id, std::unique_ptr<File>* new_file);
-
- /// Dir stores information about a temporary directory.
- class Dir {
- public:
- const std::string& path() const { return path_; }
-
- // Return true if it was newly added to blacklist.
- bool blacklist() {
- bool was_blacklisted = blacklisted_;
- blacklisted_ = true;
- return !was_blacklisted;
- }
- bool is_blacklisted() const { return blacklisted_; }
-
- private:
- friend class TmpFileMgr;
-
- /// path should be a absolute path to a writable scratch directory.
- Dir(const std::string& path, bool blacklisted)
- : path_(path), blacklisted_(blacklisted) {}
-
- std::string path_;
-
- bool blacklisted_;
- };
-
- /// Remove a device from the rotation. Subsequent attempts to allocate a file on that
- /// device will fail and the device will not be included in active tmp devices.
- void BlacklistDevice(DeviceId device_id);
-
- bool IsBlacklisted(DeviceId device_id);
+ /// the file is written.
+ Status NewFile(
+ FileGroup* file_group, DeviceId device_id, std::unique_ptr<File>* new_file);
bool initialized_;
- /// Protects the status of tmp dirs (i.e. whether they're blacklisted).
- SpinLock dir_status_lock_;
-
- /// The created tmp directories.
- std::vector<Dir> tmp_dirs_;
+ /// The paths of the created tmp directories.
+ std::vector<std::string> tmp_dirs_;
/// Metrics to track active scratch directories.
IntGauge* num_active_scratch_dirs_metric_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.cc b/be/src/util/disk-info.cc
index d3eeb56..eba4f26 100644
--- a/be/src/util/disk-info.cc
+++ b/be/src/util/disk-info.cc
@@ -48,7 +48,6 @@ bool DiskInfo::initialized_;
vector<DiskInfo::Disk> DiskInfo::disks_;
map<dev_t, int> DiskInfo::device_id_to_disk_id_;
map<string, int> DiskInfo::disk_name_to_disk_id_;
-int DiskInfo::num_datanode_dirs_;
// Parses /proc/partitions to get the number of disks. A bit of looking around
// seems to indicate this as the best way to do this.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h
index 4853511..323a265 100644
--- a/be/src/util/disk-info.h
+++ b/be/src/util/disk-info.h
@@ -43,24 +43,8 @@ class DiskInfo {
return disks_.size();
}
-#if 0
- /// Returns the number of (logical) disks the data node is using.
- /// It is possible for this to be more than num_disks since the datanode
- /// can be configured to have multiple data directories on the same physical
- /// disk.
- static int num_datanode_dirs() {
- DCHECK(initialized_);
- return num_datanode_dirs_;
- }
-
- /// Returns a 0-based disk index for the data node dirs index.
- static int disk_id(int datanode_dir_idx) {
- return 0;
- }
-#endif
-
/// Returns the 0-based disk index for 'path' (path must be a FS path, not
- /// hdfs path).
+ /// hdfs path). Returns -1 if the disk index is unknown.
static int disk_id(const char* path);
/// Returns the device name (e.g. sda) for disk_id
@@ -100,15 +84,11 @@ class DiskInfo {
/// mapping of dev_ts to disk ids
static std::map<dev_t, int> device_id_to_disk_id_;
-
+
/// mapping of devices names to disk ids
static std::map<std::string, int> disk_name_to_disk_id_;
- static int num_datanode_dirs_;
-
static void GetDeviceNames();
};
-
-
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 01cd927..a0cacdf 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -114,17 +114,6 @@ Status FileSystemUtil::CreateFile(const string& file_path) {
return Status::OK();
}
-Status FileSystemUtil::ResizeFile(const string& file_path, int64_t trunc_len) {
- int success = truncate(file_path.c_str(), trunc_len);
- if (success != 0) {
- return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(
- "Truncate file $0 to length $1 failed with errno $2 ($3)",
- file_path, trunc_len, errno, GetStrErrMsg())));
- }
-
- return Status::OK();
-}
-
Status FileSystemUtil::VerifyIsDirectory(const string& directory_path) {
error_code errcode;
bool exists = filesystem::exists(directory_path, errcode);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 887dc4b..3e824b8 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -36,9 +36,6 @@ class FileSystemUtil {
/// Create a file at the specified path.
static Status CreateFile(const std::string& file_path);
- /// Resize a file to a specified length - uses unistd truncate().
- static Status ResizeFile(const std::string& file_path, int64_t trunc_len);
-
/// Remove the specified paths and their enclosing files/directories.
static Status RemovePaths(const std::vector<std::string>& directories);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/mem-range.h
----------------------------------------------------------------------
diff --git a/be/src/util/mem-range.h b/be/src/util/mem-range.h
new file mode 100644
index 0000000..c55caaf
--- /dev/null
+++ b/be/src/util/mem-range.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_MEM_RANGE_H
+#define IMPALA_UTIL_MEM_RANGE_H
+
+#include <cstdint>
+
+#include "common/logging.h"
+
+namespace impala {
+
+/// Represents a range of memory. This is a convenient alternative to passing around
+/// a separate pointer and length.
+class MemRange {
+ public:
+ MemRange(uint8_t* data, int64_t len) : data_(data), len_(len) {
+ DCHECK_GE(len, 0);
+ DCHECK(len == 0 || data != nullptr);
+ }
+
+ uint8_t* data() const { return data_; }
+ int64_t len() const { return len_; }
+
+ static MemRange null() { return MemRange(nullptr, 0); }
+
+ private:
+ uint8_t* data_;
+ int64_t len_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bb4251b..5088a1b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -64,7 +64,7 @@ enum TParquetFallbackSchemaResolution {
// metadata which overrides everything else.
struct TQueryOptions {
1: optional bool abort_on_error = 0
- 2: optional i32 max_errors = 0
+ 2: optional i32 max_errors = 100
3: optional bool disable_codegen = 0
4: optional i32 batch_size = 0
5: optional i32 num_nodes = NUM_NODES_ALL
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index e405caf..1dd1396 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -14,7 +14,7 @@ set
'EXPLAIN_LEVEL','1'
'HBASE_CACHE_BLOCKS','0'
'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
'MAX_IO_BUFFERS','0'
'MAX_SCAN_RANGE_LENGTH','0'
'MEM_LIMIT','0'
@@ -46,7 +46,7 @@ set;
'EXPLAIN_LEVEL','3'
'HBASE_CACHE_BLOCKS','0'
'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
'MAX_IO_BUFFERS','0'
'MAX_SCAN_RANGE_LENGTH','0'
'MEM_LIMIT','0'
@@ -78,7 +78,7 @@ set;
'EXPLAIN_LEVEL','0'
'HBASE_CACHE_BLOCKS','0'
'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
'MAX_IO_BUFFERS','0'
'MAX_SCAN_RANGE_LENGTH','0'
'MEM_LIMIT','0'
@@ -111,7 +111,7 @@ set;
'EXPLAIN_LEVEL','1'
'HBASE_CACHE_BLOCKS','0'
'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
'MAX_IO_BUFFERS','0'
'MAX_SCAN_RANGE_LENGTH','0'
'MEM_LIMIT','0'
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/tests/custom_cluster/test_scratch_disk.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 1c02b56..f523dbe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -52,22 +52,7 @@ class TestScratchDir(CustomClusterTestSuite):
def get_dirs(dirs):
return ','.join(dirs)
- @classmethod
- def setup_class(cls):
- super(TestScratchDir, cls).setup_class()
- cls.normal_dirs = cls.generate_dirs(5)
- cls.non_writable_dirs = cls.generate_dirs(5, writable=False)
- cls.non_existing_dirs = cls.generate_dirs(5, non_existing=True)
-
- @classmethod
- def teardown_class(cls):
- for dir_path in cls.normal_dirs:
- shutil.rmtree(dir_path)
- for dir_path in cls.non_writable_dirs:
- shutil.rmtree(dir_path)
-
- @classmethod
- def generate_dirs(cls, num, writable=True, non_existing=False):
+ def generate_dirs(self, num, writable=True, non_existing=False):
result = []
for i in xrange(num):
dir_path = tempfile.mkdtemp()
@@ -75,27 +60,30 @@ class TestScratchDir(CustomClusterTestSuite):
shutil.rmtree(dir_path)
elif not writable:
os.chmod(dir_path, stat.S_IREAD)
+ if not non_existing:
+ self.created_dirs.append(dir_path)
result.append(dir_path)
+ print "Generated dir" + dir_path
return result
def setup_method(self, method):
- # We are overriding this method to prevent starting Impala before each test. In this
- # file, each test is responsible for doing that because we want to use class
- # variables like cls.normal_dirs to generate the parameter string to
- # start-impala-cluster, which are generated in setup_class (so using the with_args
- # decorator does not work).
- pass
+ # Don't call the superclass method to prevent starting Impala before each test. In
+ # this file, each test is responsible for doing that because we want to generate
+ # the parameter string to start-impala-cluster in each test method.
+ self.created_dirs = []
def teardown_method(self, method):
- pass
+ for dir_path in self.created_dirs:
+ shutil.rmtree(dir_path, ignore_errors=True)
@pytest.mark.execute_serially
def test_multiple_dirs(self, vector):
""" 5 empty directories are created in the /tmp directory and we verify that only
one of those directories is used as scratch disk. Only one should be used as
scratch because all directories are on same disk."""
+ normal_dirs = self.generate_dirs(5)
self._start_impala_cluster([
- '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.normal_dirs))])
+ '--impalad_args="-scratch_dirs={0}"'.format(','.join(normal_dirs))])
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
expected_count=1)
exec_option = vector.get_value('exec_option')
@@ -103,10 +91,9 @@ class TestScratchDir(CustomClusterTestSuite):
impalad = self.cluster.get_any_impalad()
client = impalad.service.create_beeswax_client()
self.execute_query_expect_success(client, self.spill_query, exec_option)
- assert self.count_nonempty_dirs(self.normal_dirs) == 1
+ assert self.count_nonempty_dirs(normal_dirs) == 1
@pytest.mark.execute_serially
- @CustomClusterTestSuite.with_args("-scratch_dirs=")
def test_no_dirs(self, vector):
""" Test we can execute a query with no scratch dirs """
self._start_impala_cluster(['--impalad_args="-scratch_dirs="'])
@@ -124,8 +111,9 @@ class TestScratchDir(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_non_writable_dirs(self, vector):
""" Test we can execute a query with only bad non-writable scratch """
+ non_writable_dirs = self.generate_dirs(5, writable=False)
self._start_impala_cluster([
- '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_writable_dirs))])
+ '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_writable_dirs))])
self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
+ "not use any scratch directories in list:.*. See previous "
+ "warnings for information on causes.")
@@ -139,13 +127,14 @@ class TestScratchDir(CustomClusterTestSuite):
self.execute_query_expect_failure(client, self.spill_query, exec_option)
# Should be able to execute in-memory query
self.execute_query_expect_success(client, self.in_mem_query, exec_option)
- assert self.count_nonempty_dirs(self.non_writable_dirs) == 0
+ assert self.count_nonempty_dirs(non_writable_dirs) == 0
@pytest.mark.execute_serially
def test_non_existing_dirs(self, vector):
""" Test that non-existing directories are not created or used """
+ non_existing_dirs = self.generate_dirs(5, non_existing=True)
self._start_impala_cluster([
- '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_existing_dirs))])
+ '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_existing_dirs))])
self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
+ "not use any scratch directories in list:.*. See previous "
+ "warnings for information on causes.")
@@ -160,4 +149,27 @@ class TestScratchDir(CustomClusterTestSuite):
self.execute_query_expect_failure(client, self.spill_query, exec_option)
# Should be able to execute in-memory query
self.execute_query_expect_success(client, self.in_mem_query, exec_option)
- assert self.count_nonempty_dirs(self.non_existing_dirs) == 0
+ assert self.count_nonempty_dirs(non_existing_dirs) == 0
+
+ @pytest.mark.execute_serially
+ def test_write_error_failover(self, vector):
+ """ Test that we can fail-over to writable directories if other directories
+ have permissions changed or are removed after impalad startup."""
+ dirs = self.generate_dirs(3);
+ self._start_impala_cluster([
+ '--impalad_args="-scratch_dirs={0}"'.format(','.join(dirs)),
+ '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
+ self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+ expected_count=len(dirs))
+ exec_option = vector.get_value('exec_option')
+ exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+ # Trigger errors when writing the first two directories.
+ shutil.rmtree(dirs[0]) # Remove the first directory.
+ # Make all subdirectories in the second directory non-writable.
+ for dirpath, dirnames, filenames in os.walk(dirs[1]):
+ os.chmod(dirpath, stat.S_IREAD)
+
+ # Should still be able to spill to the third directory.
+ impalad = self.cluster.get_any_impalad()
+ client = impalad.service.create_beeswax_client()
+ self.execute_query_expect_success(client, self.spill_query, exec_option)