You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/02/04 18:26:57 UTC
[impala] 01/03: IMPALA-9224: Blacklist nodes with faulty disk for
spilling
This is an automated email from the ASF dual-hosted git repository.
laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit b5e2a0ce2ed34dc12a47da23ec2adf65a2f60c0a
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Jan 12 14:00:21 2021 -0800
IMPALA-9224: Blacklist nodes with faulty disk for spilling
This patch extends blacklist functionality by adding executor node to
blacklist if a query fails caused by disk failure during spill-to-disk.
Also classifies disk error codes and defines a blacklistable error set
for non-transient disk errors. Coordinator blacklists executor only if
the executor hitted blacklistable error during spill-to-disk.
Adds a new debug action to simulate disk write error during spill-to-
disk. To use, specify in query options as:
'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
where <hostname> and <port> represent the impalad which execute the
fragment instances, <port> is the BE krpc port (default 27000).
Adds new test cases for blacklist and query-retry to cover the code
changes.
Testing:
- Passed new test cases.
- Passed exhaustive test.
- Manually simulated disk failures in scratch directories on nodes
of a cluster, verified that the nodes were blacklisted as
expected.
Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437
Reviewed-on: http://gerrit.cloudera.org:8080/16949
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/coordinator-backend-state.cc | 3 +
be/src/runtime/coordinator-backend-state.h | 9 ++
be/src/runtime/coordinator.cc | 24 ++++
be/src/runtime/coordinator.h | 9 ++
be/src/runtime/io/error-converter.cc | 61 ++++++++++
be/src/runtime/io/error-converter.h | 9 +-
be/src/runtime/query-state.cc | 7 ++
be/src/runtime/tmp-file-mgr-internal.h | 4 +-
be/src/runtime/tmp-file-mgr.cc | 49 +++++++-
be/src/runtime/tmp-file-mgr.h | 16 +++
common/protobuf/control_service.proto | 4 +
common/thrift/generate_error_codes.py | 3 +
tests/custom_cluster/test_blacklist.py | 180 +++++++++++++++++++++++++++-
tests/custom_cluster/test_query_retries.py | 143 +++++++++++++++++++++-
14 files changed, 510 insertions(+), 11 deletions(-)
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 58fd87c..f24f73a 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -539,6 +539,9 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
is_fragment_failure_ = true;
}
}
+ if (!overall_status.ok()) {
+ local_disk_faulty_ = backend_exec_status.local_disk_faulty();
+ }
// TODO: keep backend-wide stopwatch?
return IsDoneLocked(lock);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9c6fc55..a465d53 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -186,6 +186,11 @@ class Coordinator::BackendState {
/// instances for this backend.
ResourceUtilization GetResourceUtilization();
+ bool IsLocalDiskFaulty() {
+ std::lock_guard<std::mutex> l(lock_);
+ return local_disk_faulty_;
+ }
+
/// Merge the accumulated error log into 'merged'.
void MergeErrorLog(ErrorLogMap* merged);
@@ -394,6 +399,10 @@ class Coordinator::BackendState {
/// Invalid if no fragment instance has reported an error status.
TUniqueId failed_instance_id_;
+ /// If true, the backend reported that the query failure was caused by disk IO error
+ /// on its local disk.
+ bool local_disk_faulty_ = false;
+
/// Errors reported by this fragment instance.
ErrorLogMap error_log_;
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3fd0d9d..5d72026 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -978,6 +978,11 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
// any "faulty" nodes.
Status retryable_status = UpdateBlacklistWithAuxErrorInfo(
&aux_error_info, status, backend_state);
+ // Check if the backend node should be blacklisted based on the reported backend
+ // error. Note that only blacklist one node per report.
+ if (!status.ok() && retryable_status.ok()) {
+ retryable_status = UpdateBlacklistWithBackendState(status, backend_state);
+ }
// If any nodes were blacklisted, retry the query. This needs to be done before
// UpdateExecState is called with the error status to avoid exposing the error to any
@@ -1120,6 +1125,25 @@ Status Coordinator::UpdateBlacklistWithAuxErrorInfo(
return Status::OK();
}
+Status Coordinator::UpdateBlacklistWithBackendState(
+ const Status& status, BackendState* backend_state) {
+ DCHECK(!status.ok());
+ // If the Backend failed due to its local faulty disk, blacklist the backend node.
+ if (backend_state->IsLocalDiskFaulty()) {
+ Status retryable_status(TErrorCode::LOCAL_DISK_FAULTY,
+ NetworkAddressPBToString(backend_state->impalad_address()));
+ retryable_status.MergeStatus(status);
+
+ ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+ backend_state->exec_params().backend_id(), retryable_status);
+ parent_request_state_->AddBlacklistedExecutorAddress(
+ backend_state->krpc_impalad_address());
+
+ return retryable_status;
+ }
+ return Status::OK();
+}
+
void Coordinator::HandleFailedExecRpcs(vector<BackendState*> failed_backend_states) {
DCHECK(!failed_backend_states.empty());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36168c2..2ba58eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -625,6 +625,15 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
Status UpdateBlacklistWithAuxErrorInfo(std::vector<AuxErrorInfoPB>* aux_error_info,
const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
+ /// Helper function for UpdateBackendExecStatus that check if the backend node should
+ /// be blacklisted based on the reported backend error.
+ /// 'status' is the Status of the given BackendState. 'backend_state' is the
+ /// BackendState that reported an error.
+ /// Returns the Status object used when blacklisting a backend, or Status::OK if no
+ /// backends were blacklisted.
+ Status UpdateBlacklistWithBackendState(
+ const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
+
/// Called if the Exec RPC to the given vector of BackendStates failed. Currently, just
/// triggers a retry of the query.
void HandleFailedExecRpcs(std::vector<BackendState*> failed_backend_states);
diff --git a/be/src/runtime/io/error-converter.cc b/be/src/runtime/io/error-converter.cc
index 547f086..6e2d0bf 100644
--- a/be/src/runtime/io/error-converter.cc
+++ b/be/src/runtime/io/error-converter.cc
@@ -20,6 +20,7 @@
#include "gutil/strings/substitute.h"
#include "util/debug-util.h"
#include "util/error-util.h"
+#include "util/string-parser.h"
#include "common/names.h"
@@ -53,6 +54,66 @@ Status ErrorConverter::GetErrorStatusFromErrno(const string& function_name,
GetErrorText(function_name, file_path, err_no, params)));
}
+bool ErrorConverter::IsBlacklistableError(int err_no) {
+ // A set of disk IO related non-transient error codes that should cause failure for
+ // reading/writing file on local disk. These errors could be caused by incorrect
+ // configurations or file system errors, but are not caused by temporarily unavailable
+ // resource, like EAGAIN, ENOMEM, EMFILE, ENFILE and ENOSPC. Since these errors are not
+ // likely disappear soon, the node which frequently hits such disk errors should be
+ // blacklisted.
+ static const set<int32_t> blacklistable_disk_error_codes = {
+ EPERM, // Operation not permitted.
+ ENOENT, // No such file or directory.
+ ESRCH, // No such process.
+ EINTR, // Interrupted system call.
+ EIO, // Disk level I/O error.
+ ENXIO, // No such device or address.
+ E2BIG, // Argument list too long.
+ ENOEXEC, // Exec format error.
+ EBADF, // The given file descriptor is invalid.
+ EACCES, // Permission denied.
+ EFAULT, // Bad address.
+ ENODEV, // No such device
+ ENOTDIR, // It is not a directory.
+ EINVAL, // Invalid argument.
+ EFBIG, // Maximum file size reached.
+ ESPIPE, // Illegal seek.
+ EROFS, // The file system is read only.
+ ENAMETOOLONG, // Either the path length or a path component exceeds the max length.
+ EOVERFLOW}; // File size can't be represented.
+
+ // Return true if the err_no matches any of the 'blacklistable' error code.
+ return (blacklistable_disk_error_codes.find(err_no)
+ != blacklistable_disk_error_codes.end());
+}
+
+bool ErrorConverter::IsBlacklistableError(const Status& status) {
+ // Return true if the error is generated by Debug Action.
+ // Return false if error code is not set as DISK_IO_ERROR, or there is no 'err_no' in
+ // error text, or 'err_no' is set in wrong format.
+ if (status.IsInternalError()) {
+ return (status.msg().msg().find("Debug Action") != string::npos);
+ } else if (!status.IsDiskIoError()) {
+ return false;
+ }
+
+ size_t found = status.msg().msg().find("errno=");
+ if (found == string::npos) return false;
+ size_t start_pos = found + 6;
+ size_t end_pos = status.msg().msg().find(",", start_pos);
+ string value = (end_pos != string::npos) ?
+ status.msg().msg().substr(start_pos, end_pos - start_pos) :
+ status.msg().msg().substr(start_pos);
+ if (value.empty()) return false;
+ StringParser::ParseResult result;
+ int err_no = StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+ if (result != StringParser::PARSE_SUCCESS) {
+ return false;
+ } else {
+ return IsBlacklistableError(err_no);
+ }
+}
+
string ErrorConverter::GetErrorText(const string& function_name,
const string& file_path, int err_no, Params params) {
const string* error_text_body = GetErrorTextBody(err_no);
diff --git a/be/src/runtime/io/error-converter.h b/be/src/runtime/io/error-converter.h
index 52f88af..28b6251 100644
--- a/be/src/runtime/io/error-converter.h
+++ b/be/src/runtime/io/error-converter.h
@@ -47,7 +47,14 @@ public:
static Status GetErrorStatusFromErrno(const string& function_name,
const std::string& file_path, int err_no, const Params& params = Params());
-private:
+ /// Return true if the 'err_no' matches any of the 'blacklistable' error code.
+ static bool IsBlacklistableError(int err_no);
+ /// Parse error text to get 'err_no' for thr given status with error code as
+ /// DISK_IO_ERROR. Return true if the 'err_no' matches any of the 'blacklistable'
+ /// error code.
+ static bool IsBlacklistableError(const Status& status);
+
+ private:
/// Maps errno to error text
static std::unordered_map<int, std::string> errno_to_error_text_map_;
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 222e394..1b36fe9 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -344,6 +344,9 @@ Status QueryState::InitBufferPoolState() {
file_group_ = obj_pool_.Add(
new TmpFileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
host_profile_, query_id(), query_options().scratch_limit));
+ if (!query_options().debug_action.empty()) {
+ file_group_->SetDebugAction(query_options().debug_action);
+ }
}
return Status::OK();
}
@@ -521,6 +524,10 @@ void QueryState::ConstructReport(bool instances_started,
TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id());
}
}
+ if (!report_overall_status.ok() && query_spilled_.Load() == 1 && file_group_ != nullptr
+ && file_group_->IsSpillingDiskFaulty()) {
+ report->set_local_disk_faulty(true);
+ }
// Add profile to report
host_profile_->ToThrift(&profiles_forest->host_profile);
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 90d3026..7412841 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -47,8 +47,8 @@ class TmpFile {
bool AllocateSpace(int64_t num_bytes, int64_t* offset);
/// Called when an IO error is encountered for this file. Logs the error and blacklists
- /// the file.
- void Blacklist(const ErrorMsg& msg);
+ /// the file. Returns true if the file just became blacklisted.
+ bool Blacklist(const ErrorMsg& msg);
/// Delete the physical file on disk, if one was created.
/// It is not valid to read or write to a file after calling Remove().
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 24b51e9..49bc40e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -34,6 +34,7 @@
#include "runtime/bufferpool/buffer-pool-counters.h"
#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/error-converter.h"
#include "runtime/io/request-context.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-state.h"
@@ -355,9 +356,14 @@ int TmpFile::AssignDiskQueue() const {
return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
}
-void TmpFile::Blacklist(const ErrorMsg& msg) {
+bool TmpFile::Blacklist(const ErrorMsg& msg) {
LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
- blacklisted_ = true;
+ if (!blacklisted_) {
+ blacklisted_ = true;
+ return true;
+ } else {
+ return false;
+ }
}
Status TmpFile::Remove() {
@@ -416,6 +422,8 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
compression_timer_(tmp_file_mgr->compression_enabled() ?
ADD_TIMER(profile, "TotalCompressionTime") :
nullptr),
+ num_blacklisted_files_(0),
+ spilling_disk_faulty_(false),
current_bytes_allocated_(0),
next_allocation_index_(0),
free_ranges_(64) {
@@ -731,11 +739,21 @@ void TmpFileGroup::DestroyWriteHandle(unique_ptr<TmpWriteHandle> handle) {
void TmpFileGroup::WriteComplete(
TmpWriteHandle* handle, const Status& write_status) {
Status status;
- if (!write_status.ok()) {
- status = RecoverWriteError(handle, write_status);
+ // Debug action for simulating disk write error. To use, specify in query options as:
+ // 'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
+ // where <hostname> and <port> represent the impalad which execute the fragment
+ // instances, <port> is the BE krpc port (default 27000).
+ const Status* p_write_status = &write_status;
+ Status debug_status = DebugAction(debug_action_, "IMPALA_TMP_FILE_WRITE",
+ {ExecEnv::GetInstance()->krpc_address().hostname,
+ SimpleItoa(ExecEnv::GetInstance()->krpc_address().port)});
+ if (UNLIKELY(!debug_status.ok())) p_write_status = &debug_status;
+
+ if (!p_write_status->ok()) {
+ status = RecoverWriteError(handle, *p_write_status);
if (status.ok()) return;
} else {
- status = write_status;
+ status = *p_write_status;
}
handle->WriteComplete(status);
}
@@ -754,7 +772,21 @@ Status TmpFileGroup::RecoverWriteError(
{
lock_guard<SpinLock> lock(lock_);
scratch_errors_.push_back(write_status);
- handle->file_->Blacklist(write_status.msg());
+ if (handle->file_->Blacklist(write_status.msg())) {
+ DCHECK_LT(num_blacklisted_files_, tmp_files_.size());
+ ++num_blacklisted_files_;
+ if (num_blacklisted_files_ == tmp_files_.size()) {
+ // Check if all errors are 'blacklistable'.
+ bool are_all_blacklistable_errors = true;
+ for (Status& err : scratch_errors_) {
+ if (!ErrorConverter::IsBlacklistableError(err)) {
+ are_all_blacklistable_errors = false;
+ break;
+ }
+ }
+ if (are_all_blacklistable_errors) spilling_disk_faulty_ = true;
+ }
+ }
}
// Do not retry cancelled writes or propagate the error, simply return CANCELLED.
@@ -790,6 +822,11 @@ Status TmpFileGroup::ScratchAllocationFailedStatus(
return status;
}
+bool TmpFileGroup::IsSpillingDiskFaulty() {
+ lock_guard<SpinLock> lock(lock_);
+ return spilling_disk_faulty_;
+}
+
string TmpFileGroup::DebugString() {
lock_guard<SpinLock> lock(lock_);
stringstream ss;
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 5f81497..e088dd9 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -287,6 +287,11 @@ class TmpFileGroup {
TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }
+ void SetDebugAction(const std::string& debug_action) { debug_action_ = debug_action; }
+
+ /// Return true if spill-to-disk failed due to local faulty disk.
+ bool IsSpillingDiskFaulty();
+
private:
friend class TmpFile;
friend class TmpFileMgrTest;
@@ -332,6 +337,9 @@ class TmpFileGroup {
/// 'lock_' must be held by caller.
Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
+ /// Debug action of the query.
+ std::string debug_action_;
+
/// The TmpFileMgr it is associated with.
TmpFileMgr* const tmp_file_mgr_;
@@ -388,6 +396,14 @@ class TmpFileGroup {
/// the related TmpDir.
std::vector<std::unique_ptr<TmpFile>> tmp_files_;
+ /// Number of files in TmpFileGroup which have been blacklisted.
+ int num_blacklisted_files_;
+
+ /// Set to true to indicate spill-to-disk failure caused by faulty disks. It is set as
+ /// true if all temporary (a.k.a. scratch) files in this TmpFileGroup are blacklisted,
+ /// or getting disk error when reading temporary file back.
+ bool spilling_disk_faulty_;
+
/// Index Range in the 'tmp_files'. Used to keep track of index range
/// corresponding to a given priority.
struct TmpFileIndexRange {
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 14beea3..378d91c 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -259,6 +259,10 @@ message ReportExecStatusRequestPB {
// For each join node, sum of RowsReturned counters on this backend.
map<int32, int64> per_join_rows_produced = 16;
+
+ // If true, the executor failed to execute query fragments due to local disk IO
+ // fatal error, like local storage devices for spilling are corrupted.
+ optional bool local_disk_faulty = 17 [default = false];
}
message ReportExecStatusResponsePB {
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5c38a40..4c90674 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -468,6 +468,9 @@ error_codes = (
"Query $0 terminated due to join rows produced exceeds the limit of $1 "
"at node with id $2. Unset or increase JOIN_ROWS_PRODUCED_LIMIT query option "
"to produce more rows."),
+
+ ("LOCAL_DISK_FAULTY", 152,
+ "Query execution failure caused by local disk IO fatal error on backend: $0."),
)
import sys
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 8d24161..7fd61d8 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -19,13 +19,24 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
import pytest
import re
+import shutil
+import tempfile
from beeswaxd.BeeswaxService import QueryState
from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.skip import SkipIfBuildType
from time import sleep
+# The BE krpc port of the impalad to simulate disk errors in tests.
+FAILED_KRPC_PORT = 27001
-# Tests that verify the behavior of the executor blacklist.
+
+def _get_disk_write_fail_action(port):
+ return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
+# Tests that verify the behavior of the executor blacklist caused by RPC failure.
+# Coordinator adds an executor node to its blacklist if the RPC to that node failed.
+# Note: query-retry is not enabled by default.
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
class TestBlacklist(CustomClusterTestSuite):
@classmethod
@@ -169,3 +180,170 @@ class TestBlacklist(CustomClusterTestSuite):
assert match is not None and match.group(1) == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.krpc_port), \
result.runtime_profile
+
+
+# Tests that verify the behavior of the executor blacklist caused by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reported query
+# execution status with error caused by its local faulty disk.
+# Note: query-retry is not enabled by default and we assume it's not enabled in following
+# tests.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestBlacklistFaultyDisk(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestBlacklistFaultyDisk, cls).setup_class()
+
+ # Query with order by requires spill to disk if intermediate results don't fit in mem
+ spill_query = """
+ select o_orderdate, o_custkey, o_comment
+ from tpch.orders
+ order by o_orderdate
+ """
+ # Query against a big table with order by requires spill to disk if intermediate
+ # results don't fit in memory.
+ spill_query_big_table = """
+ select l_orderkey, l_linestatus, l_shipdate, l_comment
+ from tpch.lineitem
+ order by l_orderkey
+ """
+ # Query without order by can be executed without spilling to disk.
+ in_mem_query = """
+ select o_orderdate, o_custkey, o_comment from tpch.orders
+ """
+ # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+ # spill_query.
+ buffer_pool_limit = "45m"
+
+ def __generate_scratch_dir(self, num):
+ result = []
+ for i in xrange(num):
+ dir_path = tempfile.mkdtemp()
+ self.created_dirs.append(dir_path)
+ result.append(dir_path)
+ print "Generated dir" + dir_path
+ return result
+
+ def setup_method(self, method):
+ # Don't call the superclass method to prevent starting Impala before each test. In
+ # this class, each test is responsible for doing that because we want to generate
+ # the parameter string to start-impala-cluster in each test method.
+ self.created_dirs = []
+
+ def teardown_method(self, method):
+ for dir_path in self.created_dirs:
+ shutil.rmtree(dir_path, ignore_errors=True)
+
+ @SkipIfBuildType.not_dev_build
+ @pytest.mark.execute_serially
+ def test_scratch_file_write_failure(self, vector):
+ """ Test that verifies that when an impalad failed to execute query during spill-to-
+ disk due to disk write error, it is properly blacklisted by coordinator."""
+
+ # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a high
+ # statestore heartbeat frequency so that blacklisted nodes are not timeout too
+ # quickly.
+ scratch_dirs = self.__generate_scratch_dir(2)
+ self._start_impala_cluster([
+ '--impalad_args=-logbuflevel=-1',
+ '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+ '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+ '--impalad_args=--statestore_heartbeat_frequency_ms=2000',
+ '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+ self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+ expected_count=2)
+
+ # First set debug_action for query as empty.
+ vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+ vector.get_value('exec_option')['debug_action'] = ''
+ coord_impalad = self.cluster.get_first_impalad()
+ client = coord_impalad.service.create_beeswax_client()
+
+ # Expect spill to disk to success with debug_action as empty. Verify all nodes are
+ # active.
+ handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+ results = client.fetch(self.spill_query, handle)
+ assert results.success
+ client.close_query(handle)
+
+ backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+ assert backends_json["num_active_backends"] == 3, backends_json
+ assert len(backends_json["backends"]) == 3, backends_json
+
+ # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+ # krpc port is 27001.
+ vector.get_value('exec_option')['debug_action'] = \
+ _get_disk_write_fail_action(FAILED_KRPC_PORT)
+
+ # Should be able to execute in-memory query.
+ handle = self.execute_query_async_using_client(client, self.in_mem_query, vector)
+ results = client.fetch(self.in_mem_query, handle)
+ assert results.success
+ client.close_query(handle)
+
+ # Expect spill to disk to fail due to disk failure on the impalad with disk failure.
+ # Verify one node is blacklisted.
+ disk_failure_impalad = self.cluster.impalads[1]
+ assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+ try:
+ handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+ results = client.fetch(self.spill_query, handle)
+ assert False, "Query was expected to fail"
+ except Exception as e:
+ assert "Query execution failure caused by local disk IO fatal error on backend" \
+ in str(e)
+
+ backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+ assert backends_json["num_blacklisted_backends"] == 1, backends_json
+ assert backends_json["num_active_backends"] == 2, backends_json
+ assert len(backends_json["backends"]) == 3, backends_json
+ num_blacklisted = 0
+ for backend_json in backends_json["backends"]:
+ if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+ assert backend_json["is_blacklisted"], backend_json
+ assert "Query execution failure caused by local disk IO fatal error on backend" \
+ in backend_json["blacklist_cause"]
+ num_blacklisted += 1
+ else:
+ assert not backend_json["is_blacklisted"], backend_json
+ assert num_blacklisted == 1, backends_json
+
+ # Should be able to re-execute same query since the impalad with injected disk error
+ # for spill-to-disk was blacklisted.
+ handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+ results = client.fetch(self.spill_query, handle)
+ assert results.success
+ # Verify that the runtime profile contains the "Blacklisted Executors" line with the
+ # corresponding backend.
+ runtime_profile = client.get_runtime_profile(handle)
+ match = re.search("Blacklisted Executors: (.*)", runtime_profile)
+ assert match is not None and match.group(1) == "%s:%s" % \
+ (disk_failure_impalad.hostname, disk_failure_impalad.service.krpc_port), \
+ runtime_profile
+ client.close_query(handle)
+
+ # Sleep for long enough time and verify blacklisted backend was removed from the
+ # blacklist after blacklisting timeout.
+ sleep(30)
+ backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+ assert backends_json["num_active_backends"] == 3, backends_json
+
+ # Run the query again without the debug action and verify nothing was blacklisted
+ # and all 3 backends were scheduled on.
+ vector.get_value('exec_option')['debug_action'] = ''
+ handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+ results = client.fetch(self.spill_query, handle)
+ assert results.success
+ runtime_profile = client.get_runtime_profile(handle)
+ assert re.search("Blacklisted Executors: (.*)", runtime_profile) is None, \
+ runtime_profile
+ assert re.search("NumBackends: 3", runtime_profile), runtime_profile
+ client.close_query(handle)
+
+ client.close()
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 8a3611a..cb5f927 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -23,6 +23,8 @@
import pytest
import re
+import shutil
+import tempfile
import time
from random import randint
@@ -33,8 +35,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.errors import Timeout
from tests.common.skip import SkipIfEC, SkipIfBuildType
+from tests.common.skip import SkipIfNotHdfsMinicluster
-# The BE krpc port of the impalad to simulate rpc errors in tests.
+# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
FAILED_KRPC_PORT = 27001
@@ -42,6 +45,10 @@ def _get_rpc_fail_action(port):
return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
.format(port=port)
+
+def _get_disk_fail_action(port):
+ return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
# All tests in this class have SkipIfEC because all tests run a query and expect
# the query to be retried when killing a random impalad. On EC this does not always work
# because many queries that might run on three impalads for HDFS / S3 builds, might only
@@ -989,3 +996,137 @@ class TestQueryRetries(CustomClusterTestSuite):
if query_id_search:
return query_id_search.group(1)
return None
+
+
+# Tests that verify the query-retries are properly triggered by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reports query
+# execution status with error caused by its local faulty disk, then retries the failed
+# query.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
+ @classmethod
+ def get_workload(cls):
+ return 'functional-query'
+
+ @classmethod
+ def setup_class(cls):
+ if cls.exploration_strategy() != 'exhaustive':
+ pytest.skip('runs only in exhaustive')
+ super(TestQueryRetriesFaultyDisk, cls).setup_class()
+
+ # Query with order by requires spill to disk if intermediate results don't fit in mem
+ spill_query = """
+ select o_orderdate, o_custkey, o_comment
+ from tpch.orders
+ order by o_orderdate
+ """
+ # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+ # spill_query.
+ buffer_pool_limit = "45m"
+
+ def setup_method(self, method):
+ # Don't call the superclass method to prevent starting Impala before each test. In
+ # this class, each test is responsible for doing that because we want to generate
+ # the parameter string to start-impala-cluster in each test method.
+ self.created_dirs = []
+
+ def teardown_method(self, method):
+ for dir_path in self.created_dirs:
+ shutil.rmtree(dir_path, ignore_errors=True)
+
+ def __generate_scratch_dir(self, num):
+ result = []
+ for i in xrange(num):
+ dir_path = tempfile.mkdtemp()
+ self.created_dirs.append(dir_path)
+ result.append(dir_path)
+ print "Generated dir" + dir_path
+ return result
+
+ def __validate_web_ui_state(self):
+ """Validate the state of the web ui after a query (or queries) have been retried.
+ The web ui should list 0 queries as in flight, running, or queued."""
+ impalad_service = self.cluster.get_first_impalad().service
+
+ # Assert that the debug web ui shows all queries as completed
+ self.assert_eventually(60, 0.1,
+ lambda: impalad_service.get_num_in_flight_queries() == 0)
+ assert impalad_service.get_num_running_queries('default-pool') == 0
+ assert impalad_service.get_num_queued_queries('default-pool') == 0
+
+ @SkipIfBuildType.not_dev_build
+ @pytest.mark.execute_serially
+ def test_retry_spill_to_disk_failed(self, vector):
+ """ Test that verifies that when an impalad failed during spill-to-disk due to disk
+ write error, it is properly blacklisted by coordinator and query-retry is
+ triggered."""
+
+ # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a
+ # really high statestore heartbeat frequency so that blacklisted nodes are not
+ # timeout too quickly.
+ scratch_dirs = self.__generate_scratch_dir(1)
+ self._start_impala_cluster([
+ '--impalad_args=-logbuflevel=-1',
+ '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+ '--impalad_args=--allow_multiple_scratch_dirs_per_device=false',
+ '--impalad_args=--statestore_heartbeat_frequency_ms=60000',
+ '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+ self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+ expected_count=1)
+
+ # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+ # krpc port is 27001.
+ vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+ vector.get_value('exec_option')['debug_action'] = \
+ _get_disk_fail_action(FAILED_KRPC_PORT)
+ vector.get_value('exec_option')['retry_failed_queries'] = "true"
+ coord_impalad = self.cluster.get_first_impalad()
+ client = coord_impalad.service.create_beeswax_client()
+
+ disk_failure_impalad = self.cluster.impalads[1]
+ assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+ # Verify all nodes are active now.
+ backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
+ assert backends_json["num_active_backends"] == 3, backends_json
+ assert len(backends_json["backends"]) == 3, backends_json
+
+ # Expect the impalad with disk failure is blacklisted, and query-retry is triggered
+ # and is completed successfully.
+ handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+ results = client.fetch(self.spill_query, handle)
+ assert results.success
+
+ # Validate the state of the web ui. The query must be closed before validating the
+ # state since it asserts that no queries are in flight.
+ client.close_query(handle)
+ client.close()
+ self.__validate_web_ui_state()
+
+ # Verify that the impalad with injected disk IO error is blacklisted.
+ backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+ assert backends_json["num_blacklisted_backends"] == 1, backends_json
+ assert backends_json["num_active_backends"] == 2, backends_json
+ assert len(backends_json["backends"]) == 3, backends_json
+ num_blacklisted = 0
+ for backend_json in backends_json["backends"]:
+ if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+ assert backend_json["is_blacklisted"], backend_json
+ assert "Query execution failure caused by local disk IO fatal error on backend" \
+ in backend_json["blacklist_cause"]
+ num_blacklisted += 1
+ else:
+ assert not backend_json["is_blacklisted"], backend_json
+ assert num_blacklisted == 1, backends_json
+
+ # Verify that the query is re-tried and finished.
+ completed_queries = coord_impalad.service.get_completed_queries()
+ # Assert that the most recently completed query is the retried query and it is marked
+ # as 'FINISHED.
+ assert completed_queries[0]['state'] == 'FINISHED'
+ assert completed_queries[0]["rows_fetched"] == 1500000
+ # Assert that the second most recently completed query is the original query and it
+ # is marked as 'RETRIED'.
+ assert completed_queries[1]['state'] == 'RETRIED'
+ assert completed_queries[1]["rows_fetched"] == 0
+ assert completed_queries[1]['query_id'] == handle.get_handle().id