You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/01/05 15:32:16 UTC

[3/5] incubator-impala git commit: IMPALA-3202, IMPALA-2079: rework scratch file I/O

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index dc35600..bf2b7ec 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -15,16 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "runtime/tmp-file-mgr.h"
+
 #include <boost/algorithm/string.hpp>
+#include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread/locks.hpp>
-#include <boost/uuid/uuid_io.hpp>
 #include <boost/uuid/random_generator.hpp>
-#include <boost/filesystem.hpp>
-#include <gutil/strings/substitute.h>
+#include <boost/uuid/uuid_io.hpp>
 #include <gutil/strings/join.h>
+#include <gutil/strings/substitute.h>
 
-#include "runtime/tmp-file-mgr.h"
+#include "runtime/runtime-state.h"
+#include "runtime/tmp-file-mgr-internal.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
@@ -32,9 +35,13 @@
 
 #include "common/names.h"
 
+DEFINE_bool(disk_spill_encryption, false,
+    "Set this to encrypt and perform an integrity "
+    "check on all data spilled to disk during a query");
 DEFINE_string(scratch_dirs, "/tmp", "Writable scratch directories");
-
-#include "common/names.h"
+DEFINE_bool(allow_multiple_scratch_dirs_per_device, false,
+    "If false and --scratch_dirs contains multiple directories on the same device, "
+    "then only the first writable directory is used");
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -55,8 +62,10 @@ const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dir
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST =
     "tmp-file-mgr.active-scratch-dirs.list";
 
-TmpFileMgr::TmpFileMgr() : initialized_(false), dir_status_lock_(), tmp_dirs_(),
-  num_active_scratch_dirs_metric_(NULL), active_scratch_dirs_metric_(NULL) {}
+TmpFileMgr::TmpFileMgr()
+  : initialized_(false),
+    num_active_scratch_dirs_metric_(nullptr),
+    active_scratch_dirs_metric_(nullptr) {}
 
 Status TmpFileMgr::Init(MetricGroup* metrics) {
   string tmp_dirs_spec = FLAGS_scratch_dirs;
@@ -65,7 +74,7 @@ Status TmpFileMgr::Init(MetricGroup* metrics) {
   if (!tmp_dirs_spec.empty()) {
     split(all_tmp_dirs, tmp_dirs_spec, is_any_of(","), token_compress_on);
   }
-  return InitCustom(all_tmp_dirs, true, metrics);
+  return InitCustom(all_tmp_dirs, !FLAGS_allow_multiple_scratch_dirs_per_device, metrics);
 }
 
 Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_device,
@@ -108,7 +117,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
         if (disk_id >= 0) is_tmp_dir_on_disk[disk_id] = true;
         LOG(INFO) << "Using scratch directory " << scratch_subdir_path.string() << " on "
                   << "disk " << disk_id;
-        tmp_dirs_.push_back(Dir(scratch_subdir_path.string(), false));
+        tmp_dirs_.push_back(scratch_subdir_path.string());
       } else {
         LOG(WARNING) << "Could not remove and recreate directory "
                      << scratch_subdir_path.string() << ": cannot use it for scratch. "
@@ -117,14 +126,14 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
     }
   }
 
-  DCHECK(metrics != NULL);
+  DCHECK(metrics != nullptr);
   num_active_scratch_dirs_metric_ =
       metrics->AddGauge<int64_t>(TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS, 0);
-  active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(metrics,
-      TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
+  active_scratch_dirs_metric_ = SetMetric<string>::CreateAndRegister(
+      metrics, TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST, set<string>());
   num_active_scratch_dirs_metric_->set_value(tmp_dirs_.size());
   for (int i = 0; i < tmp_dirs_.size(); ++i) {
-    active_scratch_dirs_metric_->Add(tmp_dirs_[i].path());
+    active_scratch_dirs_metric_->Add(tmp_dirs_[i]);
   }
 
   initialized_ = true;
@@ -137,24 +146,20 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
   return Status::OK();
 }
 
-Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id,
-    const TUniqueId& query_id, unique_ptr<File>* new_file) {
+Status TmpFileMgr::NewFile(
+    FileGroup* file_group, DeviceId device_id, unique_ptr<File>* new_file) {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
-  DCHECK(file_group != NULL);
-  if (IsBlacklisted(device_id)) {
-    return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, tmp_dirs_[device_id].path());
-  }
-
+  DCHECK(file_group != nullptr);
   // Generate the full file path.
   string unique_name = lexical_cast<string>(random_generator()());
   stringstream file_name;
-  file_name << PrintId(query_id) << "_" << unique_name;
-  path new_file_path(tmp_dirs_[device_id].path());
+  file_name << PrintId(file_group->unique_id()) << "_" << unique_name;
+  path new_file_path(tmp_dirs_[device_id]);
   new_file_path /= file_name.str();
 
-  new_file->reset(new File(this, file_group, device_id, new_file_path.string()));
+  new_file->reset(new File(file_group, device_id, new_file_path.string()));
   return Status::OK();
 }
 
@@ -162,162 +167,133 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
-  return tmp_dirs_[device_id].path();
+  return tmp_dirs_[device_id];
 }
 
-void TmpFileMgr::BlacklistDevice(DeviceId device_id) {
+int TmpFileMgr::NumActiveTmpDevices() {
   DCHECK(initialized_);
-  DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
-  bool added;
-  {
-    lock_guard<SpinLock> l(dir_status_lock_);
-    added = tmp_dirs_[device_id].blacklist();
-  }
-  if (added) {
-    num_active_scratch_dirs_metric_->Increment(-1);
-    active_scratch_dirs_metric_->Remove(tmp_dirs_[device_id].path());
-  }
+  return tmp_dirs_.size();
 }
 
-bool TmpFileMgr::IsBlacklisted(DeviceId device_id) {
-  DCHECK(initialized_);
-  DCHECK(device_id >= 0 && device_id < tmp_dirs_.size());
-  lock_guard<SpinLock> l(dir_status_lock_);
-  return tmp_dirs_[device_id].is_blacklisted();
-}
-
-int TmpFileMgr::num_active_tmp_devices() {
-  DCHECK(initialized_);
-  lock_guard<SpinLock> l(dir_status_lock_);
-  int num_active = 0;
-  for (int device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
-    if (!tmp_dirs_[device_id].is_blacklisted()) ++num_active;
-  }
-  return num_active;
-}
-
-vector<TmpFileMgr::DeviceId> TmpFileMgr::active_tmp_devices() {
+vector<TmpFileMgr::DeviceId> TmpFileMgr::ActiveTmpDevices() {
   vector<TmpFileMgr::DeviceId> devices;
-  // Allocate vector before we grab lock
-  devices.reserve(tmp_dirs_.size());
-  {
-    lock_guard<SpinLock> l(dir_status_lock_);
-    for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
-      if (!tmp_dirs_[device_id].is_blacklisted()) {
-        devices.push_back(device_id);
-      }
-    }
+  for (DeviceId device_id = 0; device_id < tmp_dirs_.size(); ++device_id) {
+    devices.push_back(device_id);
   }
   return devices;
 }
 
-TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
-    const string& path)
-  : mgr_(mgr),
-    file_group_(file_group),
+TmpFileMgr::File::File(FileGroup* file_group, DeviceId device_id, const string& path)
+  : file_group_(file_group),
     path_(path),
     device_id_(device_id),
-    current_size_(0),
+    disk_id_(DiskInfo::disk_id(path.c_str())),
+    bytes_allocated_(0),
     blacklisted_(false) {
-  DCHECK(file_group != NULL);
+  DCHECK(file_group != nullptr);
 }
 
 Status TmpFileMgr::File::AllocateSpace(int64_t num_bytes, int64_t* offset) {
   DCHECK_GT(num_bytes, 0);
-  Status status;
-  if (mgr_->IsBlacklisted(device_id_)) {
-    blacklisted_ = true;
-    return Status(TErrorCode::TMP_FILE_BLACKLISTED, path_);
-  }
-  if (current_size_ == 0) {
-    // First call to AllocateSpace. Create the file.
-    status = FileSystemUtil::CreateFile(path_);
-    if (!status.ok()) {
-      ReportIOError(status.msg());
-      return status;
-    }
-    disk_id_ = DiskInfo::disk_id(path_.c_str());
-  }
-  int64_t new_size = current_size_ + num_bytes;
-  status = FileSystemUtil::ResizeFile(path_, new_size);
-  if (!status.ok()) {
-    ReportIOError(status.msg());
-    return status;
-  }
-  *offset = current_size_;
-  current_size_ = new_size;
+  *offset = bytes_allocated_;
+  bytes_allocated_ += num_bytes;
   return Status::OK();
 }
 
-void TmpFileMgr::File::ReportIOError(const ErrorMsg& msg) {
+int TmpFileMgr::File::AssignDiskQueue() const {
+  return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
+}
+
+void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
   LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
-  // IMPALA-2305: avoid blacklisting to prevent test failures.
-  // blacklisted_ = true;
-  // mgr_->BlacklistDevice(device_id_);
+  blacklisted_ = true;
 }
 
 Status TmpFileMgr::File::Remove() {
-  if (current_size_ > 0) FileSystemUtil::RemovePaths(vector<string>(1, path_));
+  // Remove the file if present (it may not be present if no writes completed).
+  FileSystemUtil::RemovePaths({path_});
   return Status::OK();
 }
 
-TmpFileMgr::FileGroup::FileGroup(
-    TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit)
+string TmpFileMgr::File::DebugString() {
+  return Substitute("File $0 path '$1' device id $2 disk id $3 bytes allocated $4 "
+      "blacklisted $5", this, path_, device_id_, disk_id_, bytes_allocated_,
+      blacklisted_);
+}
+
+TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
+    RuntimeProfile* profile, const TUniqueId& unique_id, int64_t block_size,
+    int64_t bytes_limit)
   : tmp_file_mgr_(tmp_file_mgr),
-    current_bytes_allocated_(0),
+    io_mgr_(io_mgr),
+    io_ctx_(nullptr),
+    unique_id_(unique_id),
+    block_size_(block_size),
     bytes_limit_(bytes_limit),
+    write_counter_(ADD_COUNTER(profile, "ScratchWrites", TUnit::UNIT)),
+    bytes_written_counter_(ADD_COUNTER(profile, "ScratchBytesWritten", TUnit::BYTES)),
+    read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
+    bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
+    scratch_space_bytes_used_counter_(
+        ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
+    disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
+    encryption_timer_(ADD_TIMER(profile, "TotalEncryptionTime")),
+    current_bytes_allocated_(0),
     next_allocation_index_(0) {
-  DCHECK(tmp_file_mgr != NULL);
-  scratch_space_bytes_used_counter_ =
-      ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES);
+  DCHECK_GT(block_size_, 0);
+  DCHECK(tmp_file_mgr != nullptr);
+  io_mgr_->RegisterContext(&io_ctx_, nullptr);
 }
 
-Status TmpFileMgr::FileGroup::CreateFiles(const TUniqueId& query_id) {
+TmpFileMgr::FileGroup::~FileGroup() {
+  DCHECK_EQ(tmp_files_.size(), 0);
+}
+
+Status TmpFileMgr::FileGroup::CreateFiles() {
+  lock_.DCheckLocked();
   DCHECK(tmp_files_.empty());
-  vector<Status> errs;
-  vector<DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices();
+  vector<DeviceId> tmp_devices = tmp_file_mgr_->ActiveTmpDevices();
   int files_allocated = 0;
   // Initialize the tmp files and the initial file to use.
   for (int i = 0; i < tmp_devices.size(); ++i) {
-    TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i];
+    TmpFileMgr::DeviceId device_id = tmp_devices[i];
     // It is possible for a device to be blacklisted after it was returned by
-    // active_tmp_devices(), handle this gracefully by skipping devices if NewFile()
+    // ActiveTmpDevices(), handle this gracefully by skipping devices if NewFile()
     // fails.
-    Status status = NewFile(tmp_device_id, query_id);
+    unique_ptr<TmpFileMgr::File> tmp_file;
+    Status status = tmp_file_mgr_->NewFile(this, device_id, &tmp_file);
     if (status.ok()) {
+      tmp_files_.emplace_back(std::move(tmp_file));
       ++files_allocated;
     } else {
-      errs.push_back(std::move(status));
+      scratch_errors_.push_back(std::move(status));
     }
   }
   DCHECK_EQ(tmp_files_.size(), files_allocated);
   if (tmp_files_.size() == 0) {
-    Status err_status("Could not create files in any configured scratch directories "
-        "(--scratch_dirs).");
-    for (Status& err : errs) err_status.MergeStatus(err);
+    // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+    // so we must point users to the impalad error log.
+    Status err_status(
+        "Could not create files in any configured scratch directories (--scratch_dirs). "
+        "See logs for previous errors that may have caused this.");
+    for (Status& err : scratch_errors_) err_status.MergeStatus(err);
     return err_status;
   }
-
   // Start allocating on a random device to avoid overloading the first device.
   next_allocation_index_ = rand() % tmp_files_.size();
   return Status::OK();
 }
 
-Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id,
-    const TUniqueId& query_id, File** new_file) {
-  unique_ptr<TmpFileMgr::File> tmp_file;
-  RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, &tmp_file));
-  if (new_file != NULL) *new_file = tmp_file.get();
-  tmp_files_.emplace_back(std::move(tmp_file));
-  return Status::OK();
-}
-
 void TmpFileMgr::FileGroup::Close() {
-  for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) {
+  // Cancel writes before deleting the files, since in-flight writes could re-create
+  // deleted files.
+  if (io_ctx_ != nullptr) io_mgr_->UnregisterContext(io_ctx_);
+  io_ctx_ = nullptr;
+  for (std::unique_ptr<TmpFileMgr::File>& file : tmp_files_) {
     Status status = file->Remove();
     if (!status.ok()) {
-      LOG(WARNING) << "Error removing scratch file '" << file->path() << "': "
-                   << status.msg().msg();
+      LOG(WARNING) << "Error removing scratch file '" << file->path()
+                   << "': " << status.msg().msg();
     }
   }
   tmp_files_.clear();
@@ -325,18 +301,31 @@ void TmpFileMgr::FileGroup::Close() {
 
 Status TmpFileMgr::FileGroup::AllocateSpace(
     int64_t num_bytes, File** tmp_file, int64_t* file_offset) {
-  if (bytes_limit_ != -1 && current_bytes_allocated_ + num_bytes > bytes_limit_) {
+  DCHECK_LE(num_bytes, block_size_);
+  lock_guard<SpinLock> lock(lock_);
+
+  if (!free_ranges_.empty()) {
+    *tmp_file = free_ranges_.back().first;
+    *file_offset = free_ranges_.back().second;
+    free_ranges_.pop_back();
+    return Status::OK();
+  }
+
+  if (bytes_limit_ != -1 && current_bytes_allocated_ + block_size_ > bytes_limit_) {
     return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, bytes_limit_);
   }
-  vector<Status> errs;
+
+  // Lazily create the files on the first write.
+  if (tmp_files_.empty()) RETURN_IF_ERROR(CreateFiles());
+
   // Find the next physical file in round-robin order and allocate a range from it.
   for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
     *tmp_file = tmp_files_[next_allocation_index_].get();
     next_allocation_index_ = (next_allocation_index_ + 1) % tmp_files_.size();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(num_bytes, file_offset);
+    Status status = (*tmp_file)->AllocateSpace(block_size_, file_offset);
     if (status.ok()) {
-      scratch_space_bytes_used_counter_->Add(num_bytes);
+      scratch_space_bytes_used_counter_->Add(block_size_);
       current_bytes_allocated_ += num_bytes;
       return Status::OK();
     }
@@ -345,12 +334,261 @@ Status TmpFileMgr::FileGroup::AllocateSpace(
     LOG(WARNING) << "Error while allocating range in scratch file '"
                  << (*tmp_file)->path() << "': " << status.msg().msg()
                  << ". Will try another scratch file.";
-    errs.push_back(status);
+    scratch_errors_.push_back(status);
   }
-  Status err_status("No usable scratch files: space could not be allocated in any "
-                    "of the configured scratch directories (--scratch_dirs).");
-  for (Status& err : errs) err_status.MergeStatus(err);
+  // TODO: IMPALA-4697: the merged errors do not show up in the query error log,
+  // so we must point users to the impalad error log.
+  Status err_status(
+      "No usable scratch files: space could not be allocated in any of "
+      "the configured scratch directories (--scratch_dirs). See logs for previous "
+      "errors that may have caused this.");
+  // Include all previous errors that may have caused the failure.
+  for (Status& err : scratch_errors_) err_status.MergeStatus(err);
   return err_status;
 }
 
+void TmpFileMgr::FileGroup::AddFreeRange(File* file, int64_t offset) {
+  lock_guard<SpinLock> lock(lock_);
+  free_ranges_.emplace_back(file, offset);
+}
+
+Status TmpFileMgr::FileGroup::Write(
+    MemRange buffer, WriteDoneCallback cb, unique_ptr<TmpFileMgr::WriteHandle>* handle) {
+  DCHECK_GE(buffer.len(), 0);
+
+  File* tmp_file;
+  int64_t file_offset;
+  RETURN_IF_ERROR(AllocateSpace(buffer.len(), &tmp_file, &file_offset));
+
+  unique_ptr<WriteHandle> tmp_handle(new WriteHandle(encryption_timer_, cb));
+  WriteHandle* tmp_handle_ptr = tmp_handle.get(); // Pass ptr by value into lambda.
+  DiskIoMgr::WriteRange::WriteDoneCallback callback = [this, tmp_handle_ptr](
+      const Status& write_status) { WriteComplete(tmp_handle_ptr, write_status); };
+  RETURN_IF_ERROR(
+      tmp_handle->Write(io_mgr_, io_ctx_, tmp_file, file_offset, buffer, callback));
+  write_counter_->Add(1);
+  bytes_written_counter_->Add(buffer.len());
+  *handle = move(tmp_handle);
+  return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
+  DCHECK(handle->write_range_ != nullptr);
+  DCHECK(!handle->is_cancelled_);
+  DCHECK_EQ(buffer.len(), handle->len());
+
+  // Don't grab 'lock_' in this method - it is not necessary because we don't touch
+  // any members that it protects and could block other threads for the duration of
+  // the synchronous read.
+  DCHECK(!handle->write_in_flight_);
+  DCHECK(handle->write_range_ != nullptr);
+  // Don't grab handle->lock_, it is safe to touch all of handle's state since the
+  // write is not in flight.
+  DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+  scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
+      handle->write_range_->offset(), handle->write_range_->disk_id(), false,
+      DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
+  DiskIoMgr::BufferDescriptor* io_mgr_buffer;
+  {
+    SCOPED_TIMER(disk_read_timer_);
+    read_counter_->Add(1);
+    bytes_read_counter_->Add(buffer.len());
+    RETURN_IF_ERROR(io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer));
+  }
+
+  if (FLAGS_disk_spill_encryption) {
+    RETURN_IF_ERROR(handle->CheckHashAndDecrypt(buffer));
+  }
+
+  DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+  DCHECK_EQ(io_mgr_buffer->len(), buffer.len());
+  DCHECK(io_mgr_buffer->eosr());
+  io_mgr_buffer->Return();
+  return Status::OK();
+}
+
+Status TmpFileMgr::FileGroup::CancelWriteAndRestoreData(
+    unique_ptr<WriteHandle> handle, MemRange buffer) {
+  DCHECK_EQ(handle->write_range_->data(), buffer.data());
+  DCHECK_EQ(handle->len(), buffer.len());
+  handle->Cancel();
+
+  // Decrypt regardless of whether the write is still in flight or not. An in-flight
+  // write may write bogus data to disk but this lets us get some work done while the
+  // write is being cancelled.
+  Status status;
+  if (FLAGS_disk_spill_encryption) {
+    status = handle->CheckHashAndDecrypt(buffer);
+  }
+  handle->WaitForWrite();
+  AddFreeRange(handle->file_, handle->write_range_->offset());
+  handle.reset();
+  return status;
+}
+
+void TmpFileMgr::FileGroup::DestroyWriteHandle(unique_ptr<WriteHandle> handle) {
+  handle->Cancel();
+  handle->WaitForWrite();
+  AddFreeRange(handle->file_, handle->write_range_->offset());
+  handle.reset();
+}
+
+void TmpFileMgr::FileGroup::WriteComplete(
+    WriteHandle* handle, const Status& write_status) {
+  Status status;
+  if (!write_status.ok()) {
+    status = RecoverWriteError(handle, write_status);
+    if (status.ok()) return;
+  } else {
+    status = write_status;
+  }
+  handle->WriteComplete(status);
+}
+
+Status TmpFileMgr::FileGroup::RecoverWriteError(
+    WriteHandle* handle, const Status& write_status) {
+  DCHECK(!write_status.ok());
+  DCHECK(handle->file_ != nullptr);
+
+  // We can't recover from cancellation or memory limit exceeded.
+  if (write_status.IsCancelled() || write_status.IsMemLimitExceeded()) {
+    return write_status;
+  }
+
+  // Save and report the error before retrying so that the failure isn't silent.
+  {
+    lock_guard<SpinLock> lock(lock_);
+    scratch_errors_.push_back(write_status);
+  }
+  handle->file_->Blacklist(write_status.msg());
+
+  // Do not retry cancelled writes or propagate the error, simply return CANCELLED.
+  if (handle->is_cancelled_) return Status::CANCELLED;
+
+  TmpFileMgr::File* tmp_file;
+  int64_t file_offset;
+  // Discard the scratch file range - we will not reuse ranges from a bad file.
+  // Choose another file to try. Blacklisting ensures we don't retry the same file.
+  // If this fails, the status will include all the errors in 'scratch_errors_'.
+  RETURN_IF_ERROR(AllocateSpace(handle->len(), &tmp_file, &file_offset));
+  return handle->RetryWrite(io_mgr_, io_ctx_, tmp_file, file_offset);
+}
+
+string TmpFileMgr::FileGroup::DebugString() {
+  lock_guard<SpinLock> lock(lock_);
+  stringstream ss;
+  ss << "FileGroup " << this << " block size " << block_size_
+     << " bytes limit " << bytes_limit_
+     << " current bytes allocated " << current_bytes_allocated_
+     << " next allocation index " << next_allocation_index_
+     << " writes " << write_counter_->value()
+     << " bytes written " << bytes_written_counter_->value()
+     << " reads " << read_counter_->value()
+     << " bytes read " << bytes_read_counter_->value()
+     << " scratch bytes used " << scratch_space_bytes_used_counter_
+     << " dist read timer " << disk_read_timer_->value()
+     << " encryption timer " << encryption_timer_->value() << endl
+     << "  " << tmp_files_.size() << " files:" << endl;
+  for (unique_ptr<File>& file : tmp_files_) {
+    ss << "    " << file->DebugString() << endl;
+  }
+  return ss.str();
+}
+
+TmpFileMgr::WriteHandle::WriteHandle(
+    RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb)
+  : cb_(cb),
+    encryption_timer_(encryption_timer),
+    file_(nullptr),
+    is_cancelled_(false),
+    write_in_flight_(false) {}
+
+string TmpFileMgr::WriteHandle::TmpFilePath() const {
+  if (file_ == nullptr) return "";
+  return file_->path();
+}
+
+Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx,
+    File* file, int64_t offset, MemRange buffer,
+    DiskIoMgr::WriteRange::WriteDoneCallback callback) {
+  DCHECK(!write_in_flight_);
+
+  if (FLAGS_disk_spill_encryption) RETURN_IF_ERROR(EncryptAndHash(buffer));
+
+  file_ = file;
+  write_in_flight_ = true;
+  write_range_.reset(
+      new DiskIoMgr::WriteRange(file->path(), offset, file->AssignDiskQueue(), callback));
+  write_range_->SetData(buffer.data(), buffer.len());
+  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+Status TmpFileMgr::WriteHandle::RetryWrite(
+    DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset) {
+  DCHECK(write_in_flight_);
+  file_ = file;
+  write_in_flight_ = true;
+  write_range_->SetRange(file->path(), offset, file->AssignDiskQueue());
+  return io_mgr->AddWriteRange(io_ctx, write_range_.get());
+}
+
+void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {
+  WriteDoneCallback cb;
+  {
+    lock_guard<mutex> lock(write_state_lock_);
+    DCHECK(write_in_flight_);
+    write_in_flight_ = false;
+    // Need to extract 'cb_' because once 'write_in_flight_' is false, the WriteHandle
+    // may be destroyed.
+    cb = move(cb_);
+  }
+  write_complete_cv_.NotifyAll();
+  // Call 'cb' once we've updated the state. We must do this last because once 'cb' is
+  // called, it is valid to call Read() on the handle.
+  cb(write_status);
+}
+
+void TmpFileMgr::WriteHandle::Cancel() {
+  unique_lock<mutex> lock(write_state_lock_);
+  is_cancelled_ = true;
+  // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here.
+}
+
+void TmpFileMgr::WriteHandle::WaitForWrite() {
+  unique_lock<mutex> lock(write_state_lock_);
+  while (write_in_flight_) write_complete_cv_.Wait(lock);
+}
+
+Status TmpFileMgr::WriteHandle::EncryptAndHash(MemRange buffer) {
+  DCHECK(FLAGS_disk_spill_encryption);
+  SCOPED_TIMER(encryption_timer_);
+  // Since we're using AES-CFB mode, we must take care not to reuse a key/IV pair.
+  // Regenerate a new key and IV for every data buffer we write.
+  key_.InitializeRandom();
+  RETURN_IF_ERROR(key_.Encrypt(buffer.data(), buffer.len(), buffer.data()));
+  hash_.Compute(buffer.data(), buffer.len());
+  return Status::OK();
+}
+
+Status TmpFileMgr::WriteHandle::CheckHashAndDecrypt(MemRange buffer) {
+  DCHECK(FLAGS_disk_spill_encryption);
+  SCOPED_TIMER(encryption_timer_);
+  if (!hash_.Verify(buffer.data(), buffer.len())) {
+    return Status("Block verification failure");
+  }
+  return key_.Decrypt(buffer.data(), buffer.len(), buffer.data());
+}
+
+string TmpFileMgr::WriteHandle::DebugString() {
+  unique_lock<mutex> lock(write_state_lock_);
+  stringstream ss;
+  ss << "Write handle " << this << " file '" << file_->path() << "'"
+     << " is cancelled " << is_cancelled_ << " write in flight " << write_in_flight_;
+  if (write_range_ != NULL) {
+    ss << " data " << write_range_->data() << " len " << write_range_->len()
+       << " file offset " << write_range_->offset()
+       << " disk id " << write_range_->disk_id();
+  }
+  return ss.str();
+}
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 3c489b2..0c3e974 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -18,146 +18,211 @@
 #ifndef IMPALA_RUNTIME_TMP_FILE_MGR_H
 #define IMPALA_RUNTIME_TMP_FILE_MGR_H
 
+#include <functional>
+#include <utility>
+
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
+#include "runtime/disk-io-mgr.h"
+#include "util/mem-range.h"
 #include "util/collection-metrics.h"
+#include "util/condition-variable.h"
+#include "util/openssl-util.h"
 #include "util/runtime-profile.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
-/// TmpFileMgr creates and manages temporary files and directories on the local
-/// filesystem. It can manage multiple temporary directories across multiple devices.
-/// TmpFileMgr ensures that at most one directory per device is used unless overridden
-/// for testing.
+/// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
+/// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
+/// directories across multiple devices, configured via the --scratch_dirs option.
+/// TmpFileMgr manages I/O to scratch files in order to abstract away details of which
+/// files are allocated and recovery from certain I/O errors. I/O is done via DiskIoMgr.
+/// TmpFileMgr encrypts data written to disk if enabled by the --disk_spill_encryption
+/// command-line flag.
+///
+/// FileGroups manage scratch space across multiple devices. To write to scratch space,
+/// first a FileGroup is created, then FileGroup::Write() is called to asynchronously
+/// write a memory buffer to one of the scratch files. FileGroup::Write() returns a
+/// WriteHandle, which is used by the caller to identify that write operation. The
+/// caller is notified when the asynchronous write completes via a callback, after which
+/// the caller can use the WriteHandle to read back the data.
 ///
-/// Every temporary File belongs to a FileGroup: to allocate temporary files, first a
-/// FileGroup is created, then FileGroup::NewFile() is called to create a new File with
-/// a unique filename on the specified temporary device. The client can use the File
-/// handle to allocate space in the file. FileGroups can be created with a limit on
-/// the total number of bytes allocated across all files in the group.
+/// Each WriteHandle is backed by a range of data in a scratch file. The first call to
+/// Write() will create files for the FileGroup with unique filenames on the configured
+/// temporary devices. At most one directory per device is used (unless overridden for
+/// testing). Free space is managed within a FileGroup: once a WriteHandle is destroyed,
+/// the file range backing it can be recycled for a different WriteHandle. The file range
+/// of a WriteHandle can be replaced with a different one if a write error is encountered
+/// and the data instead needs to be written to a different disk.
 ///
-/// TODO: we could notify block managers about the failure so they can more take
-/// proactive action to avoid using the device.
+/// Resource Management:
+/// TmpFileMgr provides some basic support for managing local disk space consumption.
+/// A FileGroup can be created with a limit on the total number of bytes allocated across
+/// all files. Writes that would exceed the limit fail with an error status.
+///
+/// TODO: each FileGroup can manage only fixed length scratch file ranges of 'block_size',
+/// to simplify the recycling logic. BufferPool will require variable length ranges.
+/// TODO: IMPALA-4683: we could implement smarter handling of failures, e.g. to
+/// temporarily blacklist devices that show I/O errors.
 class TmpFileMgr {
  public:
-  class FileGroup;
+  class File; // Needs to be public for TmpFileMgrTest.
+  class WriteHandle;
 
-  /// DeviceId is a unique identifier for a temporary device managed by TmpFileMgr.
-  /// It is used as a handle for external classes to identify devices.
+  /// DeviceId is an internal unique identifier for a temporary device managed by
+  /// TmpFileMgr. DeviceIds in the range [0, num tmp devices) are allocated arbitrarily.
+  /// Needs to be public for TmpFileMgrTest.
   typedef int DeviceId;
 
-  /// File is a handle to a physical file in a temporary directory. Clients
-  /// can allocate file space and remove files using AllocateSpace() and Remove().
-  /// Creation of the file is deferred until the first call to AllocateSpace().
-  class File {
+  typedef std::function<void(const Status&)> WriteDoneCallback;
+
+  /// Represents a group of temporary files - one per disk with a scratch directory. The
+  /// total allocated bytes of the group can be bound by setting the space allocation
+  /// limit. The owner of the FileGroup object is responsible for calling the Close()
+  /// method to delete all the files in the group.
+  ///
+  /// Public methods of FileGroup and WriteHandle are safe to call concurrently from
+  /// multiple threads as long as different WriteHandle arguments are provided.
+  class FileGroup {
    public:
-    /// Called to notify TmpFileMgr that an IO error was encountered for this file
-    void ReportIOError(const ErrorMsg& msg);
+    /// Initialize a new file group, which will create files using 'tmp_file_mgr'
+    /// and perform I/O using 'io_mgr'. Adds counters to 'profile' to track scratch
+    /// space used. 'unique_id' is a unique ID that is used to prefix any scratch file
+    /// names. It is an error to create multiple FileGroups with the same 'unique_id'.
+    /// 'block_size' is the size of blocks in bytes that space will be allocated in.
+    /// 'bytes_limit' is the limit on the total file space to allocate.
+    FileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr, RuntimeProfile* profile,
+        const TUniqueId& unique_id, int64_t block_size, int64_t bytes_limit = -1);
+
+    ~FileGroup();
+
+    /// Asynchronously writes 'buffer' to a temporary file of this file group. If there
+    /// are multiple scratch files, this can write to any of them, and will attempt to
+    /// recover from I/O errors on one file by writing to a different file. The memory
+    /// referenced by 'buffer' must remain valid until the write completes. The callee
+    /// may rewrite the data in 'buffer' in-place (e.g. to do in-place encryption or
+    /// compression). The caller should not modify the data in 'buffer' until the write
+    /// completes or is cancelled, otherwise invalid data may be written to disk.
+    ///
+    /// TODO: buffer->len must be <= 'block_size' until FileGroup supports allocating
+    /// variable-length scratch files ranges.
+    ///
+    /// Returns an error if the scratch space cannot be allocated or the write cannot
+    /// be started. Otherwise 'handle' is set and 'cb' will be called asynchronously from
+    /// a different thread when the write completes successfully or unsuccessfully or is
+    /// cancelled.
+    ///
+    /// 'handle' must be destroyed by passing the DestroyWriteHandle() or
+    /// CancelWriteAndRestoreData().
+    Status Write(
+        MemRange buffer, WriteDoneCallback cb, std::unique_ptr<WriteHandle>* handle);
+
+    /// Synchronously read the data referenced by 'handle' from the temporary file into
+    /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
+    /// after a write successfully completes.
+    Status Read(WriteHandle* handle, MemRange buffer);
+
+    /// Cancels the write referenced by 'handle' and destroy associate resources. Also
+    /// restore the original data in the 'buffer' passed to Write(), decrypting or
+    /// decompressing as necessary. The cancellation always succeeds, but an error
+    /// is returned if restoring the data fails.
+    Status CancelWriteAndRestoreData(
+        std::unique_ptr<WriteHandle> handle, MemRange buffer);
+
+    /// Wait for the in-flight I/Os to complete and destroy resources associated with
+    /// 'handle'.
+    void DestroyWriteHandle(std::unique_ptr<WriteHandle> handle);
+
+    /// Calls Remove() on all the files in the group and deletes them.
+    void Close();
+
+    std::string DebugString();
 
-    const std::string& path() const { return path_; }
-    int disk_id() const { return disk_id_; }
-    bool is_blacklisted() const { return blacklisted_; }
+    const TUniqueId& unique_id() const { return unique_id_; }
 
    private:
-    friend class FileGroup;
-    friend class TmpFileMgr;
+    friend class File;
     friend class TmpFileMgrTest;
 
-    /// Allocates 'num_bytes' bytes in this file for a new block of data.
-    /// The file size is increased by a call to truncate() if necessary.
-    /// The physical file is created on the first call to AllocateSpace().
-    /// Returns Status::OK() and sets offset on success.
-    /// Returns an error status if an unexpected error occurs, e.g. the file could not
-    /// be created.
-    Status AllocateSpace(int64_t num_bytes, int64_t* offset);
-
-    /// Delete the physical file on disk, if one was created.
-    /// It is not valid to read or write to a file after calling Remove().
-    Status Remove();
+    /// Initializes the file group with one temporary file per disk with a scratch
+    /// directory. Returns OK if at least one temporary file could be created.
+    /// Returns an error if no temporary files were successfully created. Must only be
+    /// called once. Must be called with 'lock_' held.
+    Status CreateFiles();
 
-    /// The name of the sub-directory that Impala created within each configured scratch
-    /// directory.
-    const static std::string TMP_SUB_DIR_NAME;
+    /// Allocate 'num_bytes' bytes in a temporary file. Try multiple disks if error
+    /// occurs. Returns an error only if no temporary files are usable or the scratch
+    /// limit is exceeded. Must be called without 'lock_' held.
+    Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
 
-    /// Space (in MB) that must ideally be available for writing on a scratch
-    /// directory. A warning is issued if available space is less than this threshold.
-    const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
+    /// Add a free scratch range to 'free_ranges_'. Must be called without 'lock_' held.
+    void AddFreeRange(File* file, int64_t offset);
 
-    File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
-        const std::string& path);
+    /// Called when the DiskIoMgr write completes for 'handle'. On error, will attempt
+    /// to retry the write. On success or if the write can't be retried, calls
+    /// handle->WriteComplete().
+    void WriteComplete(WriteHandle* handle, const Status& write_status);
 
-    /// TmpFileMgr this belongs to.
-    TmpFileMgr* mgr_;
+    /// Handles a write error. Logs the write error and blacklists the device for this
+    /// file group if the cause was an I/O error. Blacklisting limits the number of times
+    /// a write is retried because each device will only be tried once. Returns OK if it
+    /// successfully reissued the write. Returns an error status if the original error
+    /// was unrecoverable or an unrecoverable error is encountered when reissuing the
+    /// write. The error status will include all previous I/O errors in its details.
+    Status RecoverWriteError(WriteHandle* handle, const Status& write_status);
 
-    /// The FileGroup this belongs to. Cannot be null.
-    FileGroup* file_group_;
+    /// The TmpFileMgr it is associated with.
+    TmpFileMgr* const tmp_file_mgr_;
 
-    /// Path of the physical file in the filesystem.
-    std::string path_;
+    /// DiskIoMgr used for all I/O to temporary files.
+    DiskIoMgr* const io_mgr_;
 
-    /// The temporary device this file is stored on.
-    DeviceId device_id_;
+    /// I/O context used for all reads and writes. Registered in constructor.
+    DiskIoRequestContext* io_ctx_;
 
-    /// The id of the disk on which the physical file lies.
-    int disk_id_;
+    /// Stores scan ranges allocated in Read(). Needed because ScanRange objects may be
+    /// touched by DiskIoMgr even after the scan is finished.
+    /// TODO: IMPALA-4249: remove once lifetime of ScanRange objects is better defined.
+    ObjectPool scan_range_pool_;
 
-    /// Current file size. Modified by AllocateSpace(). Size is 0 before file creation.
-    int64_t current_size_;
+    /// Unique across all FileGroups. Used to prefix file names.
+    const TUniqueId unique_id_;
 
-    /// Set to true to indicate that file can't be expanded. This is useful to keep here
-    /// even though it is redundant with the global per-device blacklisting in TmpFileMgr
-    /// because it can be checked without acquiring a global lock. If a file is
-    /// blacklisted, the corresponding device will always be blacklisted.
-    bool blacklisted_;
-  };
+    /// Size of the blocks in bytes that scratch space is managed in.
+    /// TODO: support variable-length scratch file ranges.
+    const int64_t block_size_;
 
-  /// Represents a group of temporary files - one per disk with a scratch directory. The
-  /// total allocated bytes of the group can be bound by setting the space allocation
-  /// limit. The owner of the FileGroup object is responsible for calling the Close()
-  /// method to delete all the files in the group.
-  class FileGroup {
-   public:
-    /// Initialize a new file group, which will create files using 'tmp_file_mgr'.
-    /// Adds counters to 'profile' to track scratch space used. 'bytes_limit' is
-    /// the limit on the total file space to allocate.
-    FileGroup(
-        TmpFileMgr* tmp_file_mgr, RuntimeProfile* profile, int64_t bytes_limit = -1);
+    /// Max write space allowed (-1 means no limit).
+    const int64_t bytes_limit_;
 
-    ~FileGroup() { DCHECK_EQ(NumFiles(), 0); }
+    /// Number of write operations (includes writes started but not yet complete).
+    RuntimeProfile::Counter* const write_counter_;
 
-    /// Initializes the file group with one temporary file per disk with a scratch
-    /// directory. 'unique_id' is a unique ID that should be used to prefix any
-    /// scratch file names. It is an error to create multiple FileGroups with the
-    /// same 'unique_id'. Returns OK if at least one temporary file could be created.
-    /// Returns an error if no temporary files were successfully created. Must only be
-    /// called once.
-    Status CreateFiles(const TUniqueId& unique_id);
+    /// Number of bytes written to disk (includes writes started but not yet complete).
+    RuntimeProfile::Counter* const bytes_written_counter_;
 
-    /// Allocate num_bytes bytes in a temporary file. Try multiple disks if error occurs.
-    /// Returns an error only if no temporary files are usable or the scratch limit is
-    /// exceeded.
-    Status AllocateSpace(int64_t num_bytes, File** tmp_file, int64_t* file_offset);
+    /// Number of read operations (includes reads started but not yet complete).
+    RuntimeProfile::Counter* const read_counter_;
 
-    /// Calls Remove() on all the files in the group and deletes them.
-    void Close();
+    /// Number of bytes read from disk (includes reads started but not yet complete).
+    RuntimeProfile::Counter* const bytes_read_counter_;
 
-    /// Returns the number of files that are a part of the group.
-    int NumFiles() { return tmp_files_.size(); }
+    /// Amount of scratch space allocated in bytes.
+    RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
 
-   private:
-    friend class TmpFileMgrTest;
+    /// Time taken for disk reads.
+    RuntimeProfile::Counter* const disk_read_timer_;
 
-    /// Creates a new File with a unique path for a query instance, adds it to the
-    /// group and returns a handle for that file. The file path is within the (single)
-    /// tmp directory on the specified device id.
-    /// If an error is encountered, e.g. the device is blacklisted, the file is not
-    /// added to this group and a non-ok status is returned.
-    Status NewFile(
-        const DeviceId& device_id, const TUniqueId& unique_id, File** new_file = NULL);
+    /// Time spent in disk spill encryption, decryption, and integrity checking.
+    RuntimeProfile::Counter* encryption_timer_;
 
-    /// The TmpFileMgr it is associated with.
-    TmpFileMgr* tmp_file_mgr_;
+    /// Protects below members.
+    SpinLock lock_;
 
     /// List of files representing the FileGroup.
     std::vector<std::unique_ptr<File>> tmp_files_;
@@ -165,16 +230,117 @@ class TmpFileMgr {
     /// Total space allocated in this group's files.
     int64_t current_bytes_allocated_;
 
-    /// Max write space allowed (-1 means no limit).
-    const int64_t bytes_limit_;
-
     /// Index into 'tmp_files' denoting the file to which the next temporary file range
     /// should be allocated from. Used to implement round-robin allocation from temporary
     /// files.
     int next_allocation_index_;
 
-    /// Amount of scratch space allocated in bytes.
-    RuntimeProfile::Counter* scratch_space_bytes_used_counter_;
+    /// List of File/offset pairs for free scratch ranges of size 'block_size_' bytes.
+    std::vector<std::pair<File*, int64_t>> free_ranges_;
+
+    /// Errors encountered when creating/writing scratch files. We store the history so
+    /// that we can report the original cause of the scratch errors if we run out of
+    /// devices to write to.
+    std::vector<Status> scratch_errors_;
+  };
+
+  /// A handle to a write operation, backed by a range of a temporary file. The operation
+  /// is either in-flight or has completed. If it completed with no error and wasn't
+  /// cancelled then the data is in the file and can be read back.
+  ///
+  /// WriteHandle is returned from FileGroup::Write(). After the write completes, the
+  /// handle can be passed to FileGroup::Read() to read back the data zero or more times.
+  /// FileGroup::DestroyWriteHandle() can be called at any time to destroy the handle and
+  /// allow reuse of the scratch file range written to. Alternatively,
+  /// FileGroup::CancelWriteAndRestoreData() can be called to reverse the effects of
+  /// FileGroup::Write() by destroying the handle and restoring the original data to the
+  /// buffer, so long as the data in the buffer was not modified by the caller.
+  ///
+  /// Public methods of WriteHandle are safe to call concurrently from multiple threads.
+  class WriteHandle {
+   public:
+    // The write must be destroyed by FileGroup::DestroyWriteHandle().
+    ~WriteHandle() {
+      DCHECK(!write_in_flight_);
+      DCHECK(is_cancelled_);
+    }
+
+    /// Path of temporary file backing the block. Intended for use in testing.
+    /// Returns empty string if no backing file allocated.
+    std::string TmpFilePath() const;
+
+    /// The length of the write range in bytes.
+    int64_t len() const { return write_range_->len(); }
+
+    std::string DebugString();
+
+   private:
+    friend class FileGroup;
+
+    WriteHandle(RuntimeProfile::Counter* encryption_timer, WriteDoneCallback cb);
+
+    /// Starts a write of 'buffer' to 'offset' of 'file'.
+    Status Write(DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file,
+        int64_t offset, MemRange buffer,
+        DiskIoMgr::WriteRange::WriteDoneCallback callback);
+
+    /// Retry the write after the initial write failed with an error, instead writing to
+    /// 'offset' of 'file'.
+    Status RetryWrite(
+        DiskIoMgr* io_mgr, DiskIoRequestContext* io_ctx, File* file, int64_t offset);
+
+    /// Cancels the write asynchronously. After Cancel() is called, writes are not
+    /// retried.
+    void Cancel();
+
+    /// Blocks until the write completes either successfully or unsuccessfully.
+    void WaitForWrite();
+
+    /// Called when the write has completed successfully or not. Sets 'write_in_flight_'
+    /// then calls 'cb_'.
+    void WriteComplete(const Status& write_status);
+
+    /// Encrypts the data in 'buffer' in-place and computes 'hash_'.
+    Status EncryptAndHash(MemRange buffer);
+
+    /// Verifies the integrity hash and decrypts the contents of 'buffer' in place.
+    Status CheckHashAndDecrypt(MemRange buffer);
+
+    /// Callback to be called when the write completes.
+    WriteDoneCallback cb_;
+
+    /// Reference to the FileGroup's 'encryption_timer_'.
+    RuntimeProfile::Counter* encryption_timer_;
+
+    /// The DiskIoMgr write range for this write.
+    boost::scoped_ptr<DiskIoMgr::WriteRange> write_range_;
+
+    /// The temporary file being written to.
+    File* file_;
+
+    /// If --disk_spill_encryption is on, a AES 256-bit key and initialization vector.
+    /// Regenerated for each write.
+    EncryptionKey key_;
+
+    /// If --disk_spill_encryption is on, our hash of the data being written. Filled in
+    /// on writes; verified on reads. This is calculated _after_ encryption.
+    IntegrityHash hash_;
+
+    /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
+    /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
+    /// so no locking is required. This is a terminal lock and should not be held while
+    /// acquiring other locks or invoking 'cb_'.
+    boost::mutex write_state_lock_;
+
+    // True if the the write has been cancelled (but is not necessarily complete).
+    bool is_cancelled_;
+
+    // True if a write is in flight.
+    bool write_in_flight_;
+
+    /// Signalled when the write completes and 'write_in_flight_' becomes false, before
+    /// 'cb_' is invoked.
+    ConditionVariable write_complete_cv_;
   };
 
   TmpFileMgr();
@@ -194,59 +360,27 @@ class TmpFileMgr {
 
   /// Total number of devices with tmp directories that are active. There is one tmp
   /// directory per device.
-  int num_active_tmp_devices();
+  int NumActiveTmpDevices();
 
   /// Return vector with device ids of all tmp devices being actively used.
   /// I.e. those that haven't been blacklisted.
-  std::vector<DeviceId> active_tmp_devices();
+  std::vector<DeviceId> ActiveTmpDevices();
 
  private:
-  /// Return a new File handle with a unique path for a query instance. The file is
-  /// associated with the file_group and the file path is within the (single) tmp
+  friend class TmpFileMgrTest;
+
+  /// Return a new File handle with a path based on file_group->unique_id. The file is
+  /// associated with the 'file_group' and the file path is within the (single) scratch
   /// directory on the specified device id. The caller owns the returned handle and is
   /// responsible for deleting it. The file is not created - creation is deferred until
-  /// the first call to File::AllocateSpace().
-  Status NewFile(FileGroup* file_group, const DeviceId& device_id,
-      const TUniqueId& unique_id, std::unique_ptr<File>* new_file);
-
-  /// Dir stores information about a temporary directory.
-  class Dir {
-   public:
-    const std::string& path() const { return path_; }
-
-    // Return true if it was newly added to blacklist.
-    bool blacklist() {
-      bool was_blacklisted = blacklisted_;
-      blacklisted_ = true;
-      return !was_blacklisted;
-    }
-    bool is_blacklisted() const { return blacklisted_; }
-
-   private:
-    friend class TmpFileMgr;
-
-    /// path should be a absolute path to a writable scratch directory.
-    Dir(const std::string& path, bool blacklisted)
-        : path_(path), blacklisted_(blacklisted) {}
-
-    std::string path_;
-
-    bool blacklisted_;
-  };
-
-  /// Remove a device from the rotation. Subsequent attempts to allocate a file on that
-  /// device will fail and the device will not be included in active tmp devices.
-  void BlacklistDevice(DeviceId device_id);
-
-  bool IsBlacklisted(DeviceId device_id);
+  /// the file is written.
+  Status NewFile(
+      FileGroup* file_group, DeviceId device_id, std::unique_ptr<File>* new_file);
 
   bool initialized_;
 
-  /// Protects the status of tmp dirs (i.e. whether they're blacklisted).
-  SpinLock dir_status_lock_;
-
-  /// The created tmp directories.
-  std::vector<Dir> tmp_dirs_;
+  /// The paths of the created tmp directories.
+  std::vector<std::string> tmp_dirs_;
 
   /// Metrics to track active scratch directories.
   IntGauge* num_active_scratch_dirs_metric_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.cc b/be/src/util/disk-info.cc
index d3eeb56..eba4f26 100644
--- a/be/src/util/disk-info.cc
+++ b/be/src/util/disk-info.cc
@@ -48,7 +48,6 @@ bool DiskInfo::initialized_;
 vector<DiskInfo::Disk> DiskInfo::disks_;
 map<dev_t, int> DiskInfo::device_id_to_disk_id_;
 map<string, int> DiskInfo::disk_name_to_disk_id_;
-int DiskInfo::num_datanode_dirs_;
 
 // Parses /proc/partitions to get the number of disks.  A bit of looking around
 // seems to indicate this as the best way to do this.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/disk-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h
index 4853511..323a265 100644
--- a/be/src/util/disk-info.h
+++ b/be/src/util/disk-info.h
@@ -43,24 +43,8 @@ class DiskInfo {
     return disks_.size();
   }
 
-#if 0
-  /// Returns the number of (logical) disks the data node is using.
-  /// It is possible for this to be more than num_disks since the datanode
-  /// can be configured to have multiple data directories on the same physical
-  /// disk.
-  static int num_datanode_dirs() {
-    DCHECK(initialized_);
-    return num_datanode_dirs_;
-  }
-
-  /// Returns a 0-based disk index for the data node dirs index.
-  static int disk_id(int datanode_dir_idx) {
-    return 0;
-  }
-#endif
-
   /// Returns the 0-based disk index for 'path' (path must be a FS path, not
-  /// hdfs path).
+  /// hdfs path). Returns -1 if the disk index is unknown.
   static int disk_id(const char* path);
 
   /// Returns the device name (e.g. sda) for disk_id
@@ -100,15 +84,11 @@ class DiskInfo {
 
   /// mapping of dev_ts to disk ids
   static std::map<dev_t, int> device_id_to_disk_id_;
-  
+
   /// mapping of devices names to disk ids
   static std::map<std::string, int> disk_name_to_disk_id_;
 
-  static int num_datanode_dirs_;
-
   static void GetDeviceNames();
 };
-
-
 }
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.cc b/be/src/util/filesystem-util.cc
index 01cd927..a0cacdf 100644
--- a/be/src/util/filesystem-util.cc
+++ b/be/src/util/filesystem-util.cc
@@ -114,17 +114,6 @@ Status FileSystemUtil::CreateFile(const string& file_path) {
   return Status::OK();
 }
 
-Status FileSystemUtil::ResizeFile(const string& file_path, int64_t trunc_len) {
-  int success = truncate(file_path.c_str(), trunc_len);
-  if (success != 0) {
-    return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR, Substitute(
-        "Truncate file $0 to length $1 failed with errno $2 ($3)",
-        file_path, trunc_len, errno, GetStrErrMsg())));
-  }
-
-  return Status::OK();
-}
-
 Status FileSystemUtil::VerifyIsDirectory(const string& directory_path) {
   error_code errcode;
   bool exists = filesystem::exists(directory_path, errcode);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/filesystem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 887dc4b..3e824b8 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -36,9 +36,6 @@ class FileSystemUtil {
   /// Create a file at the specified path.
   static Status CreateFile(const std::string& file_path);
 
-  /// Resize a file to a specified length - uses unistd truncate().
-  static Status ResizeFile(const std::string& file_path, int64_t trunc_len);
-
   /// Remove the specified paths and their enclosing files/directories.
   static Status RemovePaths(const std::vector<std::string>& directories);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/be/src/util/mem-range.h
----------------------------------------------------------------------
diff --git a/be/src/util/mem-range.h b/be/src/util/mem-range.h
new file mode 100644
index 0000000..c55caaf
--- /dev/null
+++ b/be/src/util/mem-range.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_MEM_RANGE_H
+#define IMPALA_UTIL_MEM_RANGE_H
+
+#include <cstdint>
+
+#include "common/logging.h"
+
+namespace impala {
+
+/// Represents a range of memory. This is a convenient alternative to passing around
+/// a separate pointer and length.
+class MemRange {
+ public:
+  MemRange(uint8_t* data, int64_t len) : data_(data), len_(len) {
+    DCHECK_GE(len, 0);
+    DCHECK(len == 0 || data != nullptr);
+  }
+
+  uint8_t* data() const { return data_; }
+  int64_t len() const { return len_; }
+
+  static MemRange null() { return MemRange(nullptr, 0); }
+
+ private:
+  uint8_t* data_;
+  int64_t len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index bb4251b..5088a1b 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -64,7 +64,7 @@ enum TParquetFallbackSchemaResolution {
 // metadata which overrides everything else.
 struct TQueryOptions {
   1: optional bool abort_on_error = 0
-  2: optional i32 max_errors = 0
+  2: optional i32 max_errors = 100
   3: optional bool disable_codegen = 0
   4: optional i32 batch_size = 0
   5: optional i32 num_nodes = NUM_NODES_ALL

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index e405caf..1dd1396 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -14,7 +14,7 @@ set
 'EXPLAIN_LEVEL','1'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -46,7 +46,7 @@ set;
 'EXPLAIN_LEVEL','3'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -78,7 +78,7 @@ set;
 'EXPLAIN_LEVEL','0'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'
@@ -111,7 +111,7 @@ set;
 'EXPLAIN_LEVEL','1'
 'HBASE_CACHE_BLOCKS','0'
 'HBASE_CACHING','0'
-'MAX_ERRORS','0'
+'MAX_ERRORS','100'
 'MAX_IO_BUFFERS','0'
 'MAX_SCAN_RANGE_LENGTH','0'
 'MEM_LIMIT','0'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/95ed4434/tests/custom_cluster/test_scratch_disk.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 1c02b56..f523dbe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -52,22 +52,7 @@ class TestScratchDir(CustomClusterTestSuite):
   def get_dirs(dirs):
     return ','.join(dirs)
 
-  @classmethod
-  def setup_class(cls):
-    super(TestScratchDir, cls).setup_class()
-    cls.normal_dirs = cls.generate_dirs(5)
-    cls.non_writable_dirs = cls.generate_dirs(5, writable=False)
-    cls.non_existing_dirs = cls.generate_dirs(5, non_existing=True)
-
-  @classmethod
-  def teardown_class(cls):
-    for dir_path in cls.normal_dirs:
-      shutil.rmtree(dir_path)
-    for dir_path in cls.non_writable_dirs:
-      shutil.rmtree(dir_path)
-
-  @classmethod
-  def generate_dirs(cls, num, writable=True, non_existing=False):
+  def generate_dirs(self, num, writable=True, non_existing=False):
     result = []
     for i in xrange(num):
       dir_path = tempfile.mkdtemp()
@@ -75,27 +60,30 @@ class TestScratchDir(CustomClusterTestSuite):
         shutil.rmtree(dir_path)
       elif not writable:
         os.chmod(dir_path, stat.S_IREAD)
+      if not non_existing:
+        self.created_dirs.append(dir_path)
       result.append(dir_path)
+      print "Generated dir" + dir_path
     return result
 
   def setup_method(self, method):
-    # We are overriding this method to prevent starting Impala before each test. In this
-    # file, each test is responsible for doing that because we want to use class
-    # variables like cls.normal_dirs to generate the parameter string to
-    # start-impala-cluster, which are generated in setup_class (so using the with_args
-    # decorator does not work).
-    pass
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this file, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
 
   def teardown_method(self, method):
-    pass
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
 
   @pytest.mark.execute_serially
   def test_multiple_dirs(self, vector):
     """ 5 empty directories are created in the /tmp directory and we verify that only
         one of those directories is used as scratch disk. Only one should be used as
         scratch because all directories are on same disk."""
+    normal_dirs = self.generate_dirs(5)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.normal_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(normal_dirs))])
     self.assert_impalad_log_contains("INFO", "Using scratch directory ",
                                     expected_count=1)
     exec_option = vector.get_value('exec_option')
@@ -103,10 +91,9 @@ class TestScratchDir(CustomClusterTestSuite):
     impalad = self.cluster.get_any_impalad()
     client = impalad.service.create_beeswax_client()
     self.execute_query_expect_success(client, self.spill_query, exec_option)
-    assert self.count_nonempty_dirs(self.normal_dirs) == 1
+    assert self.count_nonempty_dirs(normal_dirs) == 1
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args("-scratch_dirs=")
   def test_no_dirs(self, vector):
     """ Test we can execute a query with no scratch dirs """
     self._start_impala_cluster(['--impalad_args="-scratch_dirs="'])
@@ -124,8 +111,9 @@ class TestScratchDir(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   def test_non_writable_dirs(self, vector):
     """ Test we can execute a query with only bad non-writable scratch """
+    non_writable_dirs = self.generate_dirs(5, writable=False)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_writable_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_writable_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -139,13 +127,14 @@ class TestScratchDir(CustomClusterTestSuite):
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
     self.execute_query_expect_success(client, self.in_mem_query, exec_option)
-    assert self.count_nonempty_dirs(self.non_writable_dirs) == 0
+    assert self.count_nonempty_dirs(non_writable_dirs) == 0
 
   @pytest.mark.execute_serially
   def test_non_existing_dirs(self, vector):
     """ Test that non-existing directories are not created or used """
+    non_existing_dirs = self.generate_dirs(5, non_existing=True)
     self._start_impala_cluster([
-      '--impalad_args="-scratch_dirs={0}"'.format(','.join(self.non_existing_dirs))])
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(non_existing_dirs))])
     self.assert_impalad_log_contains("ERROR", "Running without spill to disk: could "
         + "not use any scratch directories in list:.*. See previous "
         + "warnings for information on causes.")
@@ -160,4 +149,27 @@ class TestScratchDir(CustomClusterTestSuite):
     self.execute_query_expect_failure(client, self.spill_query, exec_option)
     # Should be able to execute in-memory query
     self.execute_query_expect_success(client, self.in_mem_query, exec_option)
-    assert self.count_nonempty_dirs(self.non_existing_dirs) == 0
+    assert self.count_nonempty_dirs(non_existing_dirs) == 0
+
+  @pytest.mark.execute_serially
+  def test_write_error_failover(self, vector):
+    """ Test that we can fail-over to writable directories if other directories
+        have permissions changed or are removed after impalad startup."""
+    dirs = self.generate_dirs(3);
+    self._start_impala_cluster([
+      '--impalad_args="-scratch_dirs={0}"'.format(','.join(dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(dirs))
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    # Trigger errors when writing the first two directories.
+    shutil.rmtree(dirs[0]) # Remove the first directory.
+    # Make all subdirectories in the second directory non-writable.
+    for dirpath, dirnames, filenames in os.walk(dirs[1]):
+      os.chmod(dirpath, stat.S_IREAD)
+
+    # Should still be able to spill to the third directory.
+    impalad = self.cluster.get_any_impalad()
+    client = impalad.service.create_beeswax_client()
+    self.execute_query_expect_success(client, self.spill_query, exec_option)