You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/02/04 18:26:57 UTC

[impala] 01/03: IMPALA-9224: Blacklist nodes with faulty disk for spilling

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5e2a0ce2ed34dc12a47da23ec2adf65a2f60c0a
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Jan 12 14:00:21 2021 -0800

    IMPALA-9224: Blacklist nodes with faulty disk for spilling
    
    This patch extends blacklist functionality by adding executor node to
    blacklist if a query fails caused by disk failure during spill-to-disk.
    Also classifies disk error codes and defines a blacklistable error set
    for non-transient disk errors. Coordinator blacklists executor only if
    the executor hitted blacklistable error during spill-to-disk.
    
    Adds a new debug action to simulate disk write error during spill-to-
    disk. To use, specify in query options as:
      'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
    
      where <hostname> and <port> represent the impalad which execute the
      fragment instances, <port> is the BE krpc port (default 27000).
    
    Adds new test cases for blacklist and query-retry to cover the code
    changes.
    
    Testing:
     - Passed new test cases.
     - Passed exhaustive test.
     - Manually simulated disk failures in scratch directories on nodes
       of a cluster, verified that the nodes were blacklisted as
       expected.
    
    Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437
    Reviewed-on: http://gerrit.cloudera.org:8080/16949
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc |   3 +
 be/src/runtime/coordinator-backend-state.h  |   9 ++
 be/src/runtime/coordinator.cc               |  24 ++++
 be/src/runtime/coordinator.h                |   9 ++
 be/src/runtime/io/error-converter.cc        |  61 ++++++++++
 be/src/runtime/io/error-converter.h         |   9 +-
 be/src/runtime/query-state.cc               |   7 ++
 be/src/runtime/tmp-file-mgr-internal.h      |   4 +-
 be/src/runtime/tmp-file-mgr.cc              |  49 +++++++-
 be/src/runtime/tmp-file-mgr.h               |  16 +++
 common/protobuf/control_service.proto       |   4 +
 common/thrift/generate_error_codes.py       |   3 +
 tests/custom_cluster/test_blacklist.py      | 180 +++++++++++++++++++++++++++-
 tests/custom_cluster/test_query_retries.py  | 143 +++++++++++++++++++++-
 14 files changed, 510 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 58fd87c..f24f73a 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -539,6 +539,9 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       is_fragment_failure_ = true;
     }
   }
+  if (!overall_status.ok()) {
+    local_disk_faulty_ = backend_exec_status.local_disk_faulty();
+  }
 
   // TODO: keep backend-wide stopwatch?
   return IsDoneLocked(lock);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9c6fc55..a465d53 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -186,6 +186,11 @@ class Coordinator::BackendState {
   /// instances for this backend.
   ResourceUtilization GetResourceUtilization();
 
+  bool IsLocalDiskFaulty() {
+    std::lock_guard<std::mutex> l(lock_);
+    return local_disk_faulty_;
+  }
+
   /// Merge the accumulated error log into 'merged'.
   void MergeErrorLog(ErrorLogMap* merged);
 
@@ -394,6 +399,10 @@ class Coordinator::BackendState {
   /// Invalid if no fragment instance has reported an error status.
   TUniqueId failed_instance_id_;
 
+  /// If true, the backend reported that the query failure was caused by disk IO error
+  /// on its local disk.
+  bool local_disk_faulty_ = false;
+
   /// Errors reported by this fragment instance.
   ErrorLogMap error_log_;
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3fd0d9d..5d72026 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -978,6 +978,11 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
     // any "faulty" nodes.
     Status retryable_status = UpdateBlacklistWithAuxErrorInfo(
         &aux_error_info, status, backend_state);
+    // Check if the backend node should be blacklisted based on the reported backend
+    // error. Note that only blacklist one node per report.
+    if (!status.ok() && retryable_status.ok()) {
+      retryable_status = UpdateBlacklistWithBackendState(status, backend_state);
+    }
 
     // If any nodes were blacklisted, retry the query. This needs to be done before
     // UpdateExecState is called with the error status to avoid exposing the error to any
@@ -1120,6 +1125,25 @@ Status Coordinator::UpdateBlacklistWithAuxErrorInfo(
   return Status::OK();
 }
 
+Status Coordinator::UpdateBlacklistWithBackendState(
+    const Status& status, BackendState* backend_state) {
+  DCHECK(!status.ok());
+  // If the Backend failed due to its local faulty disk, blacklist the backend node.
+  if (backend_state->IsLocalDiskFaulty()) {
+    Status retryable_status(TErrorCode::LOCAL_DISK_FAULTY,
+        NetworkAddressPBToString(backend_state->impalad_address()));
+    retryable_status.MergeStatus(status);
+
+    ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+        backend_state->exec_params().backend_id(), retryable_status);
+    parent_request_state_->AddBlacklistedExecutorAddress(
+        backend_state->krpc_impalad_address());
+
+    return retryable_status;
+  }
+  return Status::OK();
+}
+
 void Coordinator::HandleFailedExecRpcs(vector<BackendState*> failed_backend_states) {
   DCHECK(!failed_backend_states.empty());
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36168c2..2ba58eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -625,6 +625,15 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateBlacklistWithAuxErrorInfo(std::vector<AuxErrorInfoPB>* aux_error_info,
       const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
 
+  /// Helper function for UpdateBackendExecStatus that check if the backend node should
+  /// be blacklisted based on the reported backend error.
+  /// 'status' is the Status of the given BackendState. 'backend_state' is the
+  /// BackendState that reported an error.
+  /// Returns the Status object used when blacklisting a backend, or Status::OK if no
+  /// backends were blacklisted.
+  Status UpdateBlacklistWithBackendState(
+      const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
+
   /// Called if the Exec RPC to the given vector of BackendStates failed. Currently, just
   /// triggers a retry of the query.
   void HandleFailedExecRpcs(std::vector<BackendState*> failed_backend_states);
diff --git a/be/src/runtime/io/error-converter.cc b/be/src/runtime/io/error-converter.cc
index 547f086..6e2d0bf 100644
--- a/be/src/runtime/io/error-converter.cc
+++ b/be/src/runtime/io/error-converter.cc
@@ -20,6 +20,7 @@
 #include "gutil/strings/substitute.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
+#include "util/string-parser.h"
 
 #include "common/names.h"
 
@@ -53,6 +54,66 @@ Status ErrorConverter::GetErrorStatusFromErrno(const string& function_name,
       GetErrorText(function_name, file_path, err_no, params)));
 }
 
+bool ErrorConverter::IsBlacklistableError(int err_no) {
+  // A set of disk IO related non-transient error codes that should cause failure for
+  // reading/writing file on local disk. These errors could be caused by incorrect
+  // configurations or file system errors, but are not caused by temporarily unavailable
+  // resource, like EAGAIN, ENOMEM, EMFILE, ENFILE and ENOSPC. Since these errors are not
+  // likely disappear soon, the node which frequently hits such disk errors should be
+  // blacklisted.
+  static const set<int32_t> blacklistable_disk_error_codes = {
+      EPERM, // Operation not permitted.
+      ENOENT, // No such file or directory.
+      ESRCH, // No such process.
+      EINTR, // Interrupted system call.
+      EIO, // Disk level I/O error.
+      ENXIO, // No such device or address.
+      E2BIG, // Argument list too long.
+      ENOEXEC, // Exec format error.
+      EBADF, // The given file descriptor is invalid.
+      EACCES, // Permission denied.
+      EFAULT, // Bad address.
+      ENODEV, // No such device
+      ENOTDIR, // It is not a directory.
+      EINVAL, // Invalid argument.
+      EFBIG, // Maximum file size reached.
+      ESPIPE, // Illegal seek.
+      EROFS, // The file system is read only.
+      ENAMETOOLONG, // Either the path length or a path component exceeds the max length.
+      EOVERFLOW}; // File size can't be represented.
+
+  // Return true if the err_no matches any of the 'blacklistable' error code.
+  return (blacklistable_disk_error_codes.find(err_no)
+      != blacklistable_disk_error_codes.end());
+}
+
+bool ErrorConverter::IsBlacklistableError(const Status& status) {
+  // Return true if the error is generated by Debug Action.
+  // Return false if error code is not set as DISK_IO_ERROR, or there is no 'err_no' in
+  // error text, or 'err_no' is set in wrong format.
+  if (status.IsInternalError()) {
+    return (status.msg().msg().find("Debug Action") != string::npos);
+  } else if (!status.IsDiskIoError()) {
+    return false;
+  }
+
+  size_t found = status.msg().msg().find("errno=");
+  if (found == string::npos) return false;
+  size_t start_pos = found + 6;
+  size_t end_pos = status.msg().msg().find(",", start_pos);
+  string value = (end_pos != string::npos) ?
+      status.msg().msg().substr(start_pos, end_pos - start_pos) :
+      status.msg().msg().substr(start_pos);
+  if (value.empty()) return false;
+  StringParser::ParseResult result;
+  int err_no = StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+  if (result != StringParser::PARSE_SUCCESS) {
+    return false;
+  } else {
+    return IsBlacklistableError(err_no);
+  }
+}
+
 string ErrorConverter::GetErrorText(const string& function_name,
     const string& file_path, int err_no, Params params) {
   const string* error_text_body = GetErrorTextBody(err_no);
diff --git a/be/src/runtime/io/error-converter.h b/be/src/runtime/io/error-converter.h
index 52f88af..28b6251 100644
--- a/be/src/runtime/io/error-converter.h
+++ b/be/src/runtime/io/error-converter.h
@@ -47,7 +47,14 @@ public:
   static Status GetErrorStatusFromErrno(const string& function_name,
       const std::string& file_path, int err_no, const Params& params = Params());
 
-private:
+  /// Return true if the 'err_no' matches any of the 'blacklistable' error code.
+  static bool IsBlacklistableError(int err_no);
+  /// Parse error text to get 'err_no' for thr given status with error code as
+  /// DISK_IO_ERROR. Return true if the 'err_no' matches any of the 'blacklistable'
+  /// error code.
+  static bool IsBlacklistableError(const Status& status);
+
+ private:
   /// Maps errno to error text
   static std::unordered_map<int, std::string> errno_to_error_text_map_;
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 222e394..1b36fe9 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -344,6 +344,9 @@ Status QueryState::InitBufferPoolState() {
     file_group_ = obj_pool_.Add(
         new TmpFileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
             host_profile_, query_id(), query_options().scratch_limit));
+    if (!query_options().debug_action.empty()) {
+      file_group_->SetDebugAction(query_options().debug_action);
+    }
   }
   return Status::OK();
 }
@@ -521,6 +524,10 @@ void QueryState::ConstructReport(bool instances_started,
       TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id());
     }
   }
+  if (!report_overall_status.ok() && query_spilled_.Load() == 1 && file_group_ != nullptr
+      && file_group_->IsSpillingDiskFaulty()) {
+    report->set_local_disk_faulty(true);
+  }
 
   // Add profile to report
   host_profile_->ToThrift(&profiles_forest->host_profile);
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 90d3026..7412841 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -47,8 +47,8 @@ class TmpFile {
   bool AllocateSpace(int64_t num_bytes, int64_t* offset);
 
   /// Called when an IO error is encountered for this file. Logs the error and blacklists
-  /// the file.
-  void Blacklist(const ErrorMsg& msg);
+  /// the file. Returns true if the file just became blacklisted.
+  bool Blacklist(const ErrorMsg& msg);
 
   /// Delete the physical file on disk, if one was created.
   /// It is not valid to read or write to a file after calling Remove().
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 24b51e9..49bc40e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -34,6 +34,7 @@
 #include "runtime/bufferpool/buffer-pool-counters.h"
 #include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/error-converter.h"
 #include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
@@ -355,9 +356,14 @@ int TmpFile::AssignDiskQueue() const {
   return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
 }
 
-void TmpFile::Blacklist(const ErrorMsg& msg) {
+bool TmpFile::Blacklist(const ErrorMsg& msg) {
   LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
-  blacklisted_ = true;
+  if (!blacklisted_) {
+    blacklisted_ = true;
+    return true;
+  } else {
+    return false;
+  }
 }
 
 Status TmpFile::Remove() {
@@ -416,6 +422,8 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     compression_timer_(tmp_file_mgr->compression_enabled() ?
             ADD_TIMER(profile, "TotalCompressionTime") :
             nullptr),
+    num_blacklisted_files_(0),
+    spilling_disk_faulty_(false),
     current_bytes_allocated_(0),
     next_allocation_index_(0),
     free_ranges_(64) {
@@ -731,11 +739,21 @@ void TmpFileGroup::DestroyWriteHandle(unique_ptr<TmpWriteHandle> handle) {
 void TmpFileGroup::WriteComplete(
     TmpWriteHandle* handle, const Status& write_status) {
   Status status;
-  if (!write_status.ok()) {
-    status = RecoverWriteError(handle, write_status);
+  // Debug action for simulating disk write error. To use, specify in query options as:
+  // 'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
+  // where <hostname> and <port> represent the impalad which execute the fragment
+  // instances, <port> is the BE krpc port (default 27000).
+  const Status* p_write_status = &write_status;
+  Status debug_status = DebugAction(debug_action_, "IMPALA_TMP_FILE_WRITE",
+      {ExecEnv::GetInstance()->krpc_address().hostname,
+          SimpleItoa(ExecEnv::GetInstance()->krpc_address().port)});
+  if (UNLIKELY(!debug_status.ok())) p_write_status = &debug_status;
+
+  if (!p_write_status->ok()) {
+    status = RecoverWriteError(handle, *p_write_status);
     if (status.ok()) return;
   } else {
-    status = write_status;
+    status = *p_write_status;
   }
   handle->WriteComplete(status);
 }
@@ -754,7 +772,21 @@ Status TmpFileGroup::RecoverWriteError(
   {
     lock_guard<SpinLock> lock(lock_);
     scratch_errors_.push_back(write_status);
-    handle->file_->Blacklist(write_status.msg());
+    if (handle->file_->Blacklist(write_status.msg())) {
+      DCHECK_LT(num_blacklisted_files_, tmp_files_.size());
+      ++num_blacklisted_files_;
+      if (num_blacklisted_files_ == tmp_files_.size()) {
+        // Check if all errors are 'blacklistable'.
+        bool are_all_blacklistable_errors = true;
+        for (Status& err : scratch_errors_) {
+          if (!ErrorConverter::IsBlacklistableError(err)) {
+            are_all_blacklistable_errors = false;
+            break;
+          }
+        }
+        if (are_all_blacklistable_errors) spilling_disk_faulty_ = true;
+      }
+    }
   }
 
   // Do not retry cancelled writes or propagate the error, simply return CANCELLED.
@@ -790,6 +822,11 @@ Status TmpFileGroup::ScratchAllocationFailedStatus(
   return status;
 }
 
+bool TmpFileGroup::IsSpillingDiskFaulty() {
+  lock_guard<SpinLock> lock(lock_);
+  return spilling_disk_faulty_;
+}
+
 string TmpFileGroup::DebugString() {
   lock_guard<SpinLock> lock(lock_);
   stringstream ss;
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 5f81497..e088dd9 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -287,6 +287,11 @@ class TmpFileGroup {
 
   TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }
 
+  void SetDebugAction(const std::string& debug_action) { debug_action_ = debug_action; }
+
+  /// Return true if spill-to-disk failed due to local faulty disk.
+  bool IsSpillingDiskFaulty();
+
  private:
   friend class TmpFile;
   friend class TmpFileMgrTest;
@@ -332,6 +337,9 @@ class TmpFileGroup {
   /// 'lock_' must be held by caller.
   Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
 
+  /// Debug action of the query.
+  std::string debug_action_;
+
   /// The TmpFileMgr it is associated with.
   TmpFileMgr* const tmp_file_mgr_;
 
@@ -388,6 +396,14 @@ class TmpFileGroup {
   /// the related TmpDir.
   std::vector<std::unique_ptr<TmpFile>> tmp_files_;
 
+  /// Number of files in TmpFileGroup which have been blacklisted.
+  int num_blacklisted_files_;
+
+  /// Set to true to indicate spill-to-disk failure caused by faulty disks. It is set as
+  /// true if all temporary (a.k.a. scratch) files in this TmpFileGroup are blacklisted,
+  /// or getting disk error when reading temporary file back.
+  bool spilling_disk_faulty_;
+
   /// Index Range in the 'tmp_files'. Used to keep track of index range
   /// corresponding to a given priority.
   struct TmpFileIndexRange {
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 14beea3..378d91c 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -259,6 +259,10 @@ message ReportExecStatusRequestPB {
 
   // For each join node, sum of RowsReturned counters on this backend.
   map<int32, int64> per_join_rows_produced = 16;
+
+  // If true, the executor failed to execute query fragments due to local disk IO
+  // fatal error, like local storage devices for spilling are corrupted.
+  optional bool local_disk_faulty = 17 [default = false];
 }
 
 message ReportExecStatusResponsePB {
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5c38a40..4c90674 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -468,6 +468,9 @@ error_codes = (
    "Query $0 terminated due to join rows produced exceeds the limit of $1 "
    "at node with id $2. Unset or increase JOIN_ROWS_PRODUCED_LIMIT query option "
    "to produce more rows."),
+
+  ("LOCAL_DISK_FAULTY", 152,
+   "Query execution failure caused by local disk IO fatal error on backend: $0."),
 )
 
 import sys
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 8d24161..7fd61d8 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -19,13 +19,24 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 import pytest
 import re
+import shutil
+import tempfile
 
 from beeswaxd.BeeswaxService import QueryState
 from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.skip import SkipIfBuildType
 from time import sleep
 
+# The BE krpc port of the impalad to simulate disk errors in tests.
+FAILED_KRPC_PORT = 27001
 
-# Tests that verify the behavior of the executor blacklist.
+
+def _get_disk_write_fail_action(port):
+  return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
+# Tests that verify the behavior of the executor blacklist caused by RPC failure.
+# Coordinator adds an executor node to its blacklist if the RPC to that node failed.
+# Note: query-retry is not enabled by default.
 @SkipIfNotHdfsMinicluster.tuned_for_minicluster
 class TestBlacklist(CustomClusterTestSuite):
   @classmethod
@@ -169,3 +180,170 @@ class TestBlacklist(CustomClusterTestSuite):
     assert match is not None and match.group(1) == "%s:%s" % \
       (killed_impalad.hostname, killed_impalad.service.krpc_port), \
       result.runtime_profile
+
+
+# Tests that verify the behavior of the executor blacklist caused by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reported query
+# execution status with error caused by its local faulty disk.
+# Note: query-retry is not enabled by default and we assume it's not enabled in following
+# tests.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestBlacklistFaultyDisk(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestBlacklistFaultyDisk, cls).setup_class()
+
+  # Query with order by requires spill to disk if intermediate results don't fit in mem
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+  # Query against a big table with order by requires spill to disk if intermediate
+  # results don't fit in memory.
+  spill_query_big_table = """
+      select l_orderkey, l_linestatus, l_shipdate, l_comment
+      from tpch.lineitem
+      order by l_orderkey
+      """
+  # Query without order by can be executed without spilling to disk.
+  in_mem_query = """
+      select o_orderdate, o_custkey, o_comment from tpch.orders
+      """
+  # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+  # spill_query.
+  buffer_pool_limit = "45m"
+
+  def __generate_scratch_dir(self, num):
+    result = []
+    for i in xrange(num):
+      dir_path = tempfile.mkdtemp()
+      self.created_dirs.append(dir_path)
+      result.append(dir_path)
+      print "Generated dir" + dir_path
+    return result
+
+  def setup_method(self, method):
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this class, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
+
+  def teardown_method(self, method):
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  def test_scratch_file_write_failure(self, vector):
+    """ Test that verifies that when an impalad failed to execute query during spill-to-
+        disk due to disk write error, it is properly blacklisted by coordinator."""
+
+    # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a high
+    # statestore heartbeat frequency so that blacklisted nodes are not timeout too
+    # quickly.
+    scratch_dirs = self.__generate_scratch_dir(2)
+    self._start_impala_cluster([
+        '--impalad_args=-logbuflevel=-1',
+        '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+        '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+        '--impalad_args=--statestore_heartbeat_frequency_ms=2000',
+        '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+        expected_count=2)
+
+    # First set debug_action for query as empty.
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    vector.get_value('exec_option')['debug_action'] = ''
+    coord_impalad = self.cluster.get_first_impalad()
+    client = coord_impalad.service.create_beeswax_client()
+
+    # Expect spill to disk to success with debug_action as empty. Verify all nodes are
+    # active.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    client.close_query(handle)
+
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+
+    # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+    # krpc port is 27001.
+    vector.get_value('exec_option')['debug_action'] = \
+        _get_disk_write_fail_action(FAILED_KRPC_PORT)
+
+    # Should be able to execute in-memory query.
+    handle = self.execute_query_async_using_client(client, self.in_mem_query, vector)
+    results = client.fetch(self.in_mem_query, handle)
+    assert results.success
+    client.close_query(handle)
+
+    # Expect spill to disk to fail due to disk failure on the impalad with disk failure.
+    # Verify one node is blacklisted.
+    disk_failure_impalad = self.cluster.impalads[1]
+    assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    try:
+      handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+      results = client.fetch(self.spill_query, handle)
+      assert False, "Query was expected to fail"
+    except Exception as e:
+      assert "Query execution failure caused by local disk IO fatal error on backend" \
+          in str(e)
+
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_blacklisted_backends"] == 1, backends_json
+    assert backends_json["num_active_backends"] == 2, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+    num_blacklisted = 0
+    for backend_json in backends_json["backends"]:
+      if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+        assert backend_json["is_blacklisted"], backend_json
+        assert "Query execution failure caused by local disk IO fatal error on backend" \
+            in backend_json["blacklist_cause"]
+        num_blacklisted += 1
+      else:
+        assert not backend_json["is_blacklisted"], backend_json
+    assert num_blacklisted == 1, backends_json
+
+    # Should be able to re-execute same query since the impalad with injected disk error
+    # for spill-to-disk was blacklisted.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    # Verify that the runtime profile contains the "Blacklisted Executors" line with the
+    # corresponding backend.
+    runtime_profile = client.get_runtime_profile(handle)
+    match = re.search("Blacklisted Executors: (.*)", runtime_profile)
+    assert match is not None and match.group(1) == "%s:%s" % \
+        (disk_failure_impalad.hostname, disk_failure_impalad.service.krpc_port), \
+        runtime_profile
+    client.close_query(handle)
+
+    # Sleep for long enough time and verify blacklisted backend was removed from the
+    # blacklist after blacklisting timeout.
+    sleep(30)
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+
+    # Run the query again without the debug action and verify nothing was blacklisted
+    # and all 3 backends were scheduled on.
+    vector.get_value('exec_option')['debug_action'] = ''
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    runtime_profile = client.get_runtime_profile(handle)
+    assert re.search("Blacklisted Executors: (.*)", runtime_profile) is None, \
+        runtime_profile
+    assert re.search("NumBackends: 3", runtime_profile), runtime_profile
+    client.close_query(handle)
+
+    client.close()
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 8a3611a..cb5f927 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -23,6 +23,8 @@
 
 import pytest
 import re
+import shutil
+import tempfile
 import time
 
 from random import randint
@@ -33,8 +35,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.errors import Timeout
 from tests.common.skip import SkipIfEC, SkipIfBuildType
+from tests.common.skip import SkipIfNotHdfsMinicluster
 
-# The BE krpc port of the impalad to simulate rpc errors in tests.
+# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
 FAILED_KRPC_PORT = 27001
 
 
@@ -42,6 +45,10 @@ def _get_rpc_fail_action(port):
   return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
       .format(port=port)
 
+
+def _get_disk_fail_action(port):
+  return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
 # All tests in this class have SkipIfEC because all tests run a query and expect
 # the query to be retried when killing a random impalad. On EC this does not always work
 # because many queries that might run on three impalads for HDFS / S3 builds, might only
@@ -989,3 +996,137 @@ class TestQueryRetries(CustomClusterTestSuite):
         if query_id_search:
           return query_id_search.group(1)
     return None
+
+
+# Tests that verify the query-retries are properly triggered by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reports query
+# execution status with error caused by its local faulty disk, then retries the failed
+# query.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestQueryRetriesFaultyDisk, cls).setup_class()
+
+  # Query with order by requires spill to disk if intermediate results don't fit in mem
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+  # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+  # spill_query.
+  buffer_pool_limit = "45m"
+
+  def setup_method(self, method):
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this class, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
+
+  def teardown_method(self, method):
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
+
+  def __generate_scratch_dir(self, num):
+    result = []
+    for i in xrange(num):
+      dir_path = tempfile.mkdtemp()
+      self.created_dirs.append(dir_path)
+      result.append(dir_path)
+      print "Generated dir" + dir_path
+    return result
+
+  def __validate_web_ui_state(self):
+    """Validate the state of the web ui after a query (or queries) have been retried.
+    The web ui should list 0 queries as in flight, running, or queued."""
+    impalad_service = self.cluster.get_first_impalad().service
+
+    # Assert that the debug web ui shows all queries as completed
+    self.assert_eventually(60, 0.1,
+        lambda: impalad_service.get_num_in_flight_queries() == 0)
+    assert impalad_service.get_num_running_queries('default-pool') == 0
+    assert impalad_service.get_num_queued_queries('default-pool') == 0
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  def test_retry_spill_to_disk_failed(self, vector):
+    """ Test that verifies that when an impalad failed during spill-to-disk due to disk
+        write error, it is properly blacklisted by coordinator and query-retry is
+        triggered."""
+
+    # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a
+    # really high statestore heartbeat frequency so that blacklisted nodes are not
+    # timeout too quickly.
+    scratch_dirs = self.__generate_scratch_dir(1)
+    self._start_impala_cluster([
+        '--impalad_args=-logbuflevel=-1',
+        '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+        '--impalad_args=--allow_multiple_scratch_dirs_per_device=false',
+        '--impalad_args=--statestore_heartbeat_frequency_ms=60000',
+        '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+        expected_count=1)
+
+    # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+    # krpc port is 27001.
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    vector.get_value('exec_option')['debug_action'] = \
+        _get_disk_fail_action(FAILED_KRPC_PORT)
+    vector.get_value('exec_option')['retry_failed_queries'] = "true"
+    coord_impalad = self.cluster.get_first_impalad()
+    client = coord_impalad.service.create_beeswax_client()
+
+    disk_failure_impalad = self.cluster.impalads[1]
+    assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    # Verify all nodes are active now.
+    backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+
+    # Expect the impalad with disk failure is blacklisted, and query-retry is triggered
+    # and is completed successfully.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+
+    # Validate the state of the web ui. The query must be closed before validating the
+    # state since it asserts that no queries are in flight.
+    client.close_query(handle)
+    client.close()
+    self.__validate_web_ui_state()
+
+    # Verify that the impalad with injected disk IO error is blacklisted.
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_blacklisted_backends"] == 1, backends_json
+    assert backends_json["num_active_backends"] == 2, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+    num_blacklisted = 0
+    for backend_json in backends_json["backends"]:
+      if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+        assert backend_json["is_blacklisted"], backend_json
+        assert "Query execution failure caused by local disk IO fatal error on backend" \
+            in backend_json["blacklist_cause"]
+        num_blacklisted += 1
+      else:
+        assert not backend_json["is_blacklisted"], backend_json
+    assert num_blacklisted == 1, backends_json
+
+    # Verify that the query is re-tried and finished.
+    completed_queries = coord_impalad.service.get_completed_queries()
+    # Assert that the most recently completed query is the retried query and it is marked
+    # as 'FINISHED.
+    assert completed_queries[0]['state'] == 'FINISHED'
+    assert completed_queries[0]["rows_fetched"] == 1500000
+    # Assert that the second most recently completed query is the original query and it
+    # is marked as 'RETRIED'.
+    assert completed_queries[1]['state'] == 'RETRIED'
+    assert completed_queries[1]["rows_fetched"] == 0
+    assert completed_queries[1]['query_id'] == handle.get_handle().id