You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2021/02/04 18:26:56 UTC

[impala] branch master updated (91fd8fd -> 60f8f87)

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 91fd8fd  [config] bump toolchain build id
     new b5e2a0c  IMPALA-9224: Blacklist nodes with faulty disk for spilling
     new f8ed3f6  IMPALA-10472 flag for Kudu connection negotiation timeout
     new 60f8f87  IMPALA-10274: Initialize impala-python as part of the CMake build

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CMakeLists.txt                               |   6 +-
 be/src/codegen/gen_ir_descriptions.py        |   4 +-
 be/src/common/global-flags.cc                |   7 ++
 be/src/exec/kudu-util.cc                     |   3 +
 be/src/runtime/coordinator-backend-state.cc  |   3 +
 be/src/runtime/coordinator-backend-state.h   |   9 ++
 be/src/runtime/coordinator.cc                |  24 ++++
 be/src/runtime/coordinator.h                 |   9 ++
 be/src/runtime/io/error-converter.cc         |  61 +++++++++
 be/src/runtime/io/error-converter.h          |   9 +-
 be/src/runtime/query-state.cc                |   7 ++
 be/src/runtime/tmp-file-mgr-internal.h       |   4 +-
 be/src/runtime/tmp-file-mgr.cc               |  49 +++++++-
 be/src/runtime/tmp-file-mgr.h                |  16 +++
 bin/gen_build_version.py                     |   4 +-
 bin/{gen-cscope.sh => init-impala-python.sh} |  14 +--
 common/protobuf/control_service.proto        |   4 +
 common/thrift/generate_error_codes.py        |   3 +
 infra/python/bootstrap_virtualenv.py         |   3 +-
 tests/custom_cluster/test_blacklist.py       | 180 ++++++++++++++++++++++++++-
 tests/custom_cluster/test_query_retries.py   | 143 ++++++++++++++++++++-
 21 files changed, 540 insertions(+), 22 deletions(-)
 copy bin/{gen-cscope.sh => init-impala-python.sh} (73%)


[impala] 01/03: IMPALA-9224: Blacklist nodes with faulty disk for spilling

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b5e2a0ce2ed34dc12a47da23ec2adf65a2f60c0a
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Tue Jan 12 14:00:21 2021 -0800

    IMPALA-9224: Blacklist nodes with faulty disk for spilling
    
    This patch extends blacklist functionality by adding executor node to
    blacklist if a query fails caused by disk failure during spill-to-disk.
    Also classifies disk error codes and defines a blacklistable error set
    for non-transient disk errors. Coordinator blacklists executor only if
    the executor hitted blacklistable error during spill-to-disk.
    
    Adds a new debug action to simulate disk write error during spill-to-
    disk. To use, specify in query options as:
      'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
    
      where <hostname> and <port> represent the impalad which execute the
      fragment instances, <port> is the BE krpc port (default 27000).
    
    Adds new test cases for blacklist and query-retry to cover the code
    changes.
    
    Testing:
     - Passed new test cases.
     - Passed exhaustive test.
     - Manually simulated disk failures in scratch directories on nodes
       of a cluster, verified that the nodes were blacklisted as
       expected.
    
    Change-Id: I04bfcb7f2e0b1ef24a5b4350f270feecd8c47437
    Reviewed-on: http://gerrit.cloudera.org:8080/16949
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc |   3 +
 be/src/runtime/coordinator-backend-state.h  |   9 ++
 be/src/runtime/coordinator.cc               |  24 ++++
 be/src/runtime/coordinator.h                |   9 ++
 be/src/runtime/io/error-converter.cc        |  61 ++++++++++
 be/src/runtime/io/error-converter.h         |   9 +-
 be/src/runtime/query-state.cc               |   7 ++
 be/src/runtime/tmp-file-mgr-internal.h      |   4 +-
 be/src/runtime/tmp-file-mgr.cc              |  49 +++++++-
 be/src/runtime/tmp-file-mgr.h               |  16 +++
 common/protobuf/control_service.proto       |   4 +
 common/thrift/generate_error_codes.py       |   3 +
 tests/custom_cluster/test_blacklist.py      | 180 +++++++++++++++++++++++++++-
 tests/custom_cluster/test_query_retries.py  | 143 +++++++++++++++++++++-
 14 files changed, 510 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 58fd87c..f24f73a 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -539,6 +539,9 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       is_fragment_failure_ = true;
     }
   }
+  if (!overall_status.ok()) {
+    local_disk_faulty_ = backend_exec_status.local_disk_faulty();
+  }
 
   // TODO: keep backend-wide stopwatch?
   return IsDoneLocked(lock);
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 9c6fc55..a465d53 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -186,6 +186,11 @@ class Coordinator::BackendState {
   /// instances for this backend.
   ResourceUtilization GetResourceUtilization();
 
+  bool IsLocalDiskFaulty() {
+    std::lock_guard<std::mutex> l(lock_);
+    return local_disk_faulty_;
+  }
+
   /// Merge the accumulated error log into 'merged'.
   void MergeErrorLog(ErrorLogMap* merged);
 
@@ -394,6 +399,10 @@ class Coordinator::BackendState {
   /// Invalid if no fragment instance has reported an error status.
   TUniqueId failed_instance_id_;
 
+  /// If true, the backend reported that the query failure was caused by disk IO error
+  /// on its local disk.
+  bool local_disk_faulty_ = false;
+
   /// Errors reported by this fragment instance.
   ErrorLogMap error_log_;
 
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 3fd0d9d..5d72026 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -978,6 +978,11 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
     // any "faulty" nodes.
     Status retryable_status = UpdateBlacklistWithAuxErrorInfo(
         &aux_error_info, status, backend_state);
+    // Check if the backend node should be blacklisted based on the reported backend
+    // error. Note that only blacklist one node per report.
+    if (!status.ok() && retryable_status.ok()) {
+      retryable_status = UpdateBlacklistWithBackendState(status, backend_state);
+    }
 
     // If any nodes were blacklisted, retry the query. This needs to be done before
     // UpdateExecState is called with the error status to avoid exposing the error to any
@@ -1120,6 +1125,25 @@ Status Coordinator::UpdateBlacklistWithAuxErrorInfo(
   return Status::OK();
 }
 
+Status Coordinator::UpdateBlacklistWithBackendState(
+    const Status& status, BackendState* backend_state) {
+  DCHECK(!status.ok());
+  // If the Backend failed due to its local faulty disk, blacklist the backend node.
+  if (backend_state->IsLocalDiskFaulty()) {
+    Status retryable_status(TErrorCode::LOCAL_DISK_FAULTY,
+        NetworkAddressPBToString(backend_state->impalad_address()));
+    retryable_status.MergeStatus(status);
+
+    ExecEnv::GetInstance()->cluster_membership_mgr()->BlacklistExecutor(
+        backend_state->exec_params().backend_id(), retryable_status);
+    parent_request_state_->AddBlacklistedExecutorAddress(
+        backend_state->krpc_impalad_address());
+
+    return retryable_status;
+  }
+  return Status::OK();
+}
+
 void Coordinator::HandleFailedExecRpcs(vector<BackendState*> failed_backend_states) {
   DCHECK(!failed_backend_states.empty());
 
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 36168c2..2ba58eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -625,6 +625,15 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateBlacklistWithAuxErrorInfo(std::vector<AuxErrorInfoPB>* aux_error_info,
       const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
 
+  /// Helper function for UpdateBackendExecStatus that check if the backend node should
+  /// be blacklisted based on the reported backend error.
+  /// 'status' is the Status of the given BackendState. 'backend_state' is the
+  /// BackendState that reported an error.
+  /// Returns the Status object used when blacklisting a backend, or Status::OK if no
+  /// backends were blacklisted.
+  Status UpdateBlacklistWithBackendState(
+      const Status& status, BackendState* backend_state) WARN_UNUSED_RESULT;
+
   /// Called if the Exec RPC to the given vector of BackendStates failed. Currently, just
   /// triggers a retry of the query.
   void HandleFailedExecRpcs(std::vector<BackendState*> failed_backend_states);
diff --git a/be/src/runtime/io/error-converter.cc b/be/src/runtime/io/error-converter.cc
index 547f086..6e2d0bf 100644
--- a/be/src/runtime/io/error-converter.cc
+++ b/be/src/runtime/io/error-converter.cc
@@ -20,6 +20,7 @@
 #include "gutil/strings/substitute.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
+#include "util/string-parser.h"
 
 #include "common/names.h"
 
@@ -53,6 +54,66 @@ Status ErrorConverter::GetErrorStatusFromErrno(const string& function_name,
       GetErrorText(function_name, file_path, err_no, params)));
 }
 
+bool ErrorConverter::IsBlacklistableError(int err_no) {
+  // A set of disk IO related non-transient error codes that should cause failure for
+  // reading/writing file on local disk. These errors could be caused by incorrect
+  // configurations or file system errors, but are not caused by temporarily unavailable
+  // resource, like EAGAIN, ENOMEM, EMFILE, ENFILE and ENOSPC. Since these errors are not
+  // likely disappear soon, the node which frequently hits such disk errors should be
+  // blacklisted.
+  static const set<int32_t> blacklistable_disk_error_codes = {
+      EPERM, // Operation not permitted.
+      ENOENT, // No such file or directory.
+      ESRCH, // No such process.
+      EINTR, // Interrupted system call.
+      EIO, // Disk level I/O error.
+      ENXIO, // No such device or address.
+      E2BIG, // Argument list too long.
+      ENOEXEC, // Exec format error.
+      EBADF, // The given file descriptor is invalid.
+      EACCES, // Permission denied.
+      EFAULT, // Bad address.
+      ENODEV, // No such device
+      ENOTDIR, // It is not a directory.
+      EINVAL, // Invalid argument.
+      EFBIG, // Maximum file size reached.
+      ESPIPE, // Illegal seek.
+      EROFS, // The file system is read only.
+      ENAMETOOLONG, // Either the path length or a path component exceeds the max length.
+      EOVERFLOW}; // File size can't be represented.
+
+  // Return true if the err_no matches any of the 'blacklistable' error code.
+  return (blacklistable_disk_error_codes.find(err_no)
+      != blacklistable_disk_error_codes.end());
+}
+
+bool ErrorConverter::IsBlacklistableError(const Status& status) {
+  // Return true if the error is generated by Debug Action.
+  // Return false if error code is not set as DISK_IO_ERROR, or there is no 'err_no' in
+  // error text, or 'err_no' is set in wrong format.
+  if (status.IsInternalError()) {
+    return (status.msg().msg().find("Debug Action") != string::npos);
+  } else if (!status.IsDiskIoError()) {
+    return false;
+  }
+
+  size_t found = status.msg().msg().find("errno=");
+  if (found == string::npos) return false;
+  size_t start_pos = found + 6;
+  size_t end_pos = status.msg().msg().find(",", start_pos);
+  string value = (end_pos != string::npos) ?
+      status.msg().msg().substr(start_pos, end_pos - start_pos) :
+      status.msg().msg().substr(start_pos);
+  if (value.empty()) return false;
+  StringParser::ParseResult result;
+  int err_no = StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+  if (result != StringParser::PARSE_SUCCESS) {
+    return false;
+  } else {
+    return IsBlacklistableError(err_no);
+  }
+}
+
 string ErrorConverter::GetErrorText(const string& function_name,
     const string& file_path, int err_no, Params params) {
   const string* error_text_body = GetErrorTextBody(err_no);
diff --git a/be/src/runtime/io/error-converter.h b/be/src/runtime/io/error-converter.h
index 52f88af..28b6251 100644
--- a/be/src/runtime/io/error-converter.h
+++ b/be/src/runtime/io/error-converter.h
@@ -47,7 +47,14 @@ public:
   static Status GetErrorStatusFromErrno(const string& function_name,
       const std::string& file_path, int err_no, const Params& params = Params());
 
-private:
+  /// Return true if the 'err_no' matches any of the 'blacklistable' error code.
+  static bool IsBlacklistableError(int err_no);
+  /// Parse error text to get 'err_no' for thr given status with error code as
+  /// DISK_IO_ERROR. Return true if the 'err_no' matches any of the 'blacklistable'
+  /// error code.
+  static bool IsBlacklistableError(const Status& status);
+
+ private:
   /// Maps errno to error text
   static std::unordered_map<int, std::string> errno_to_error_text_map_;
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 222e394..1b36fe9 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -344,6 +344,9 @@ Status QueryState::InitBufferPoolState() {
     file_group_ = obj_pool_.Add(
         new TmpFileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
             host_profile_, query_id(), query_options().scratch_limit));
+    if (!query_options().debug_action.empty()) {
+      file_group_->SetDebugAction(query_options().debug_action);
+    }
   }
   return Status::OK();
 }
@@ -521,6 +524,10 @@ void QueryState::ConstructReport(bool instances_started,
       TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id());
     }
   }
+  if (!report_overall_status.ok() && query_spilled_.Load() == 1 && file_group_ != nullptr
+      && file_group_->IsSpillingDiskFaulty()) {
+    report->set_local_disk_faulty(true);
+  }
 
   // Add profile to report
   host_profile_->ToThrift(&profiles_forest->host_profile);
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index 90d3026..7412841 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -47,8 +47,8 @@ class TmpFile {
   bool AllocateSpace(int64_t num_bytes, int64_t* offset);
 
   /// Called when an IO error is encountered for this file. Logs the error and blacklists
-  /// the file.
-  void Blacklist(const ErrorMsg& msg);
+  /// the file. Returns true if the file just became blacklisted.
+  bool Blacklist(const ErrorMsg& msg);
 
   /// Delete the physical file on disk, if one was created.
   /// It is not valid to read or write to a file after calling Remove().
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 24b51e9..49bc40e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -34,6 +34,7 @@
 #include "runtime/bufferpool/buffer-pool-counters.h"
 #include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/error-converter.h"
 #include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/runtime-state.h"
@@ -355,9 +356,14 @@ int TmpFile::AssignDiskQueue() const {
   return file_group_->io_mgr_->AssignQueue(path_.c_str(), disk_id_, false);
 }
 
-void TmpFile::Blacklist(const ErrorMsg& msg) {
+bool TmpFile::Blacklist(const ErrorMsg& msg) {
   LOG(ERROR) << "Error for temporary file '" << path_ << "': " << msg.msg();
-  blacklisted_ = true;
+  if (!blacklisted_) {
+    blacklisted_ = true;
+    return true;
+  } else {
+    return false;
+  }
 }
 
 Status TmpFile::Remove() {
@@ -416,6 +422,8 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     compression_timer_(tmp_file_mgr->compression_enabled() ?
             ADD_TIMER(profile, "TotalCompressionTime") :
             nullptr),
+    num_blacklisted_files_(0),
+    spilling_disk_faulty_(false),
     current_bytes_allocated_(0),
     next_allocation_index_(0),
     free_ranges_(64) {
@@ -731,11 +739,21 @@ void TmpFileGroup::DestroyWriteHandle(unique_ptr<TmpWriteHandle> handle) {
 void TmpFileGroup::WriteComplete(
     TmpWriteHandle* handle, const Status& write_status) {
   Status status;
-  if (!write_status.ok()) {
-    status = RecoverWriteError(handle, write_status);
+  // Debug action for simulating disk write error. To use, specify in query options as:
+  // 'debug_action': 'IMPALA_TMP_FILE_WRITE:<hostname>:<port>:<action>'
+  // where <hostname> and <port> represent the impalad which execute the fragment
+  // instances, <port> is the BE krpc port (default 27000).
+  const Status* p_write_status = &write_status;
+  Status debug_status = DebugAction(debug_action_, "IMPALA_TMP_FILE_WRITE",
+      {ExecEnv::GetInstance()->krpc_address().hostname,
+          SimpleItoa(ExecEnv::GetInstance()->krpc_address().port)});
+  if (UNLIKELY(!debug_status.ok())) p_write_status = &debug_status;
+
+  if (!p_write_status->ok()) {
+    status = RecoverWriteError(handle, *p_write_status);
     if (status.ok()) return;
   } else {
-    status = write_status;
+    status = *p_write_status;
   }
   handle->WriteComplete(status);
 }
@@ -754,7 +772,21 @@ Status TmpFileGroup::RecoverWriteError(
   {
     lock_guard<SpinLock> lock(lock_);
     scratch_errors_.push_back(write_status);
-    handle->file_->Blacklist(write_status.msg());
+    if (handle->file_->Blacklist(write_status.msg())) {
+      DCHECK_LT(num_blacklisted_files_, tmp_files_.size());
+      ++num_blacklisted_files_;
+      if (num_blacklisted_files_ == tmp_files_.size()) {
+        // Check if all errors are 'blacklistable'.
+        bool are_all_blacklistable_errors = true;
+        for (Status& err : scratch_errors_) {
+          if (!ErrorConverter::IsBlacklistableError(err)) {
+            are_all_blacklistable_errors = false;
+            break;
+          }
+        }
+        if (are_all_blacklistable_errors) spilling_disk_faulty_ = true;
+      }
+    }
   }
 
   // Do not retry cancelled writes or propagate the error, simply return CANCELLED.
@@ -790,6 +822,11 @@ Status TmpFileGroup::ScratchAllocationFailedStatus(
   return status;
 }
 
+bool TmpFileGroup::IsSpillingDiskFaulty() {
+  lock_guard<SpinLock> lock(lock_);
+  return spilling_disk_faulty_;
+}
+
 string TmpFileGroup::DebugString() {
   lock_guard<SpinLock> lock(lock_);
   stringstream ss;
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 5f81497..e088dd9 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -287,6 +287,11 @@ class TmpFileGroup {
 
   TmpFileMgr* tmp_file_mgr() const { return tmp_file_mgr_; }
 
+  void SetDebugAction(const std::string& debug_action) { debug_action_ = debug_action; }
+
+  /// Return true if spill-to-disk failed due to local faulty disk.
+  bool IsSpillingDiskFaulty();
+
  private:
   friend class TmpFile;
   friend class TmpFileMgrTest;
@@ -332,6 +337,9 @@ class TmpFileGroup {
   /// 'lock_' must be held by caller.
   Status ScratchAllocationFailedStatus(const std::vector<int>& at_capacity_dirs);
 
+  /// Debug action of the query.
+  std::string debug_action_;
+
   /// The TmpFileMgr it is associated with.
   TmpFileMgr* const tmp_file_mgr_;
 
@@ -388,6 +396,14 @@ class TmpFileGroup {
   /// the related TmpDir.
   std::vector<std::unique_ptr<TmpFile>> tmp_files_;
 
+  /// Number of files in TmpFileGroup which have been blacklisted.
+  int num_blacklisted_files_;
+
+  /// Set to true to indicate spill-to-disk failure caused by faulty disks. It is set as
+  /// true if all temporary (a.k.a. scratch) files in this TmpFileGroup are blacklisted,
+  /// or getting disk error when reading temporary file back.
+  bool spilling_disk_faulty_;
+
   /// Index Range in the 'tmp_files'. Used to keep track of index range
   /// corresponding to a given priority.
   struct TmpFileIndexRange {
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 14beea3..378d91c 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -259,6 +259,10 @@ message ReportExecStatusRequestPB {
 
   // For each join node, sum of RowsReturned counters on this backend.
   map<int32, int64> per_join_rows_produced = 16;
+
+  // If true, the executor failed to execute query fragments due to local disk IO
+  // fatal error, like local storage devices for spilling are corrupted.
+  optional bool local_disk_faulty = 17 [default = false];
 }
 
 message ReportExecStatusResponsePB {
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5c38a40..4c90674 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -468,6 +468,9 @@ error_codes = (
    "Query $0 terminated due to join rows produced exceeds the limit of $1 "
    "at node with id $2. Unset or increase JOIN_ROWS_PRODUCED_LIMIT query option "
    "to produce more rows."),
+
+  ("LOCAL_DISK_FAULTY", 152,
+   "Query execution failure caused by local disk IO fatal error on backend: $0."),
 )
 
 import sys
diff --git a/tests/custom_cluster/test_blacklist.py b/tests/custom_cluster/test_blacklist.py
index 8d24161..7fd61d8 100644
--- a/tests/custom_cluster/test_blacklist.py
+++ b/tests/custom_cluster/test_blacklist.py
@@ -19,13 +19,24 @@ from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 
 import pytest
 import re
+import shutil
+import tempfile
 
 from beeswaxd.BeeswaxService import QueryState
 from tests.common.skip import SkipIfNotHdfsMinicluster
+from tests.common.skip import SkipIfBuildType
 from time import sleep
 
+# The BE krpc port of the impalad to simulate disk errors in tests.
+FAILED_KRPC_PORT = 27001
 
-# Tests that verify the behavior of the executor blacklist.
+
+def _get_disk_write_fail_action(port):
+  return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
+# Tests that verify the behavior of the executor blacklist caused by RPC failure.
+# Coordinator adds an executor node to its blacklist if the RPC to that node failed.
+# Note: query-retry is not enabled by default.
 @SkipIfNotHdfsMinicluster.tuned_for_minicluster
 class TestBlacklist(CustomClusterTestSuite):
   @classmethod
@@ -169,3 +180,170 @@ class TestBlacklist(CustomClusterTestSuite):
     assert match is not None and match.group(1) == "%s:%s" % \
       (killed_impalad.hostname, killed_impalad.service.krpc_port), \
       result.runtime_profile
+
+
+# Tests that verify the behavior of the executor blacklist caused by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reported query
+# execution status with error caused by its local faulty disk.
+# Note: query-retry is not enabled by default and we assume it's not enabled in following
+# tests.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestBlacklistFaultyDisk(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestBlacklistFaultyDisk, cls).setup_class()
+
+  # Query with order by requires spill to disk if intermediate results don't fit in mem
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+  # Query against a big table with order by requires spill to disk if intermediate
+  # results don't fit in memory.
+  spill_query_big_table = """
+      select l_orderkey, l_linestatus, l_shipdate, l_comment
+      from tpch.lineitem
+      order by l_orderkey
+      """
+  # Query without order by can be executed without spilling to disk.
+  in_mem_query = """
+      select o_orderdate, o_custkey, o_comment from tpch.orders
+      """
+  # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+  # spill_query.
+  buffer_pool_limit = "45m"
+
+  def __generate_scratch_dir(self, num):
+    result = []
+    for i in xrange(num):
+      dir_path = tempfile.mkdtemp()
+      self.created_dirs.append(dir_path)
+      result.append(dir_path)
+      print "Generated dir" + dir_path
+    return result
+
+  def setup_method(self, method):
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this class, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
+
+  def teardown_method(self, method):
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  def test_scratch_file_write_failure(self, vector):
+    """ Test that verifies that when an impalad failed to execute query during spill-to-
+        disk due to disk write error, it is properly blacklisted by coordinator."""
+
+    # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a high
+    # statestore heartbeat frequency so that blacklisted nodes are not timeout too
+    # quickly.
+    scratch_dirs = self.__generate_scratch_dir(2)
+    self._start_impala_cluster([
+        '--impalad_args=-logbuflevel=-1',
+        '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+        '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+        '--impalad_args=--statestore_heartbeat_frequency_ms=2000',
+        '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+        expected_count=2)
+
+    # First set debug_action for query as empty.
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    vector.get_value('exec_option')['debug_action'] = ''
+    coord_impalad = self.cluster.get_first_impalad()
+    client = coord_impalad.service.create_beeswax_client()
+
+    # Expect spill to disk to success with debug_action as empty. Verify all nodes are
+    # active.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    client.close_query(handle)
+
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+
+    # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+    # krpc port is 27001.
+    vector.get_value('exec_option')['debug_action'] = \
+        _get_disk_write_fail_action(FAILED_KRPC_PORT)
+
+    # Should be able to execute in-memory query.
+    handle = self.execute_query_async_using_client(client, self.in_mem_query, vector)
+    results = client.fetch(self.in_mem_query, handle)
+    assert results.success
+    client.close_query(handle)
+
+    # Expect spill to disk to fail due to disk failure on the impalad with disk failure.
+    # Verify one node is blacklisted.
+    disk_failure_impalad = self.cluster.impalads[1]
+    assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    try:
+      handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+      results = client.fetch(self.spill_query, handle)
+      assert False, "Query was expected to fail"
+    except Exception as e:
+      assert "Query execution failure caused by local disk IO fatal error on backend" \
+          in str(e)
+
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_blacklisted_backends"] == 1, backends_json
+    assert backends_json["num_active_backends"] == 2, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+    num_blacklisted = 0
+    for backend_json in backends_json["backends"]:
+      if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+        assert backend_json["is_blacklisted"], backend_json
+        assert "Query execution failure caused by local disk IO fatal error on backend" \
+            in backend_json["blacklist_cause"]
+        num_blacklisted += 1
+      else:
+        assert not backend_json["is_blacklisted"], backend_json
+    assert num_blacklisted == 1, backends_json
+
+    # Should be able to re-execute same query since the impalad with injected disk error
+    # for spill-to-disk was blacklisted.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    # Verify that the runtime profile contains the "Blacklisted Executors" line with the
+    # corresponding backend.
+    runtime_profile = client.get_runtime_profile(handle)
+    match = re.search("Blacklisted Executors: (.*)", runtime_profile)
+    assert match is not None and match.group(1) == "%s:%s" % \
+        (disk_failure_impalad.hostname, disk_failure_impalad.service.krpc_port), \
+        runtime_profile
+    client.close_query(handle)
+
+    # Sleep for long enough time and verify blacklisted backend was removed from the
+    # blacklist after blacklisting timeout.
+    sleep(30)
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+
+    # Run the query again without the debug action and verify nothing was blacklisted
+    # and all 3 backends were scheduled on.
+    vector.get_value('exec_option')['debug_action'] = ''
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    runtime_profile = client.get_runtime_profile(handle)
+    assert re.search("Blacklisted Executors: (.*)", runtime_profile) is None, \
+        runtime_profile
+    assert re.search("NumBackends: 3", runtime_profile), runtime_profile
+    client.close_query(handle)
+
+    client.close()
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 8a3611a..cb5f927 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -23,6 +23,8 @@
 
 import pytest
 import re
+import shutil
+import tempfile
 import time
 
 from random import randint
@@ -33,8 +35,9 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.errors import Timeout
 from tests.common.skip import SkipIfEC, SkipIfBuildType
+from tests.common.skip import SkipIfNotHdfsMinicluster
 
-# The BE krpc port of the impalad to simulate rpc errors in tests.
+# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
 FAILED_KRPC_PORT = 27001
 
 
@@ -42,6 +45,10 @@ def _get_rpc_fail_action(port):
   return "IMPALA_SERVICE_POOL:127.0.0.1:{port}:ExecQueryFInstances:FAIL" \
       .format(port=port)
 
+
+def _get_disk_fail_action(port):
+  return "IMPALA_TMP_FILE_WRITE:127.0.0.1:{port}:FAIL".format(port=port)
+
 # All tests in this class have SkipIfEC because all tests run a query and expect
 # the query to be retried when killing a random impalad. On EC this does not always work
 # because many queries that might run on three impalads for HDFS / S3 builds, might only
@@ -989,3 +996,137 @@ class TestQueryRetries(CustomClusterTestSuite):
         if query_id_search:
           return query_id_search.group(1)
     return None
+
+
+# Tests that verify the query-retries are properly triggered by disk IO failure.
+# Coordinator adds an executor node to its blacklist if that node reports query
+# execution status with error caused by its local faulty disk, then retries the failed
+# query.
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestQueryRetriesFaultyDisk, cls).setup_class()
+
+  # Query with order by requires spill to disk if intermediate results don't fit in mem
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+  # Buffer pool limit that is low enough to force Impala to spill to disk when executing
+  # spill_query.
+  buffer_pool_limit = "45m"
+
+  def setup_method(self, method):
+    # Don't call the superclass method to prevent starting Impala before each test. In
+    # this class, each test is responsible for doing that because we want to generate
+    # the parameter string to start-impala-cluster in each test method.
+    self.created_dirs = []
+
+  def teardown_method(self, method):
+    for dir_path in self.created_dirs:
+      shutil.rmtree(dir_path, ignore_errors=True)
+
+  def __generate_scratch_dir(self, num):
+    result = []
+    for i in xrange(num):
+      dir_path = tempfile.mkdtemp()
+      self.created_dirs.append(dir_path)
+      result.append(dir_path)
+      print "Generated dir" + dir_path
+    return result
+
+  def __validate_web_ui_state(self):
+    """Validate the state of the web ui after a query (or queries) have been retried.
+    The web ui should list 0 queries as in flight, running, or queued."""
+    impalad_service = self.cluster.get_first_impalad().service
+
+    # Assert that the debug web ui shows all queries as completed
+    self.assert_eventually(60, 0.1,
+        lambda: impalad_service.get_num_in_flight_queries() == 0)
+    assert impalad_service.get_num_running_queries('default-pool') == 0
+    assert impalad_service.get_num_queued_queries('default-pool') == 0
+
+  @SkipIfBuildType.not_dev_build
+  @pytest.mark.execute_serially
+  def test_retry_spill_to_disk_failed(self, vector):
+    """ Test that verifies that when an impalad failed during spill-to-disk due to disk
+        write error, it is properly blacklisted by coordinator and query-retry is
+        triggered."""
+
+    # Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a
+    # really high statestore heartbeat frequency so that blacklisted nodes are not
+    # timeout too quickly.
+    scratch_dirs = self.__generate_scratch_dir(1)
+    self._start_impala_cluster([
+        '--impalad_args=-logbuflevel=-1',
+        '--impalad_args=--scratch_dirs={0}'.format(','.join(scratch_dirs)),
+        '--impalad_args=--allow_multiple_scratch_dirs_per_device=false',
+        '--impalad_args=--statestore_heartbeat_frequency_ms=60000',
+        '--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+        expected_count=1)
+
+    # Set debug_action to inject disk write error for spill-to-disk on impalad for which
+    # krpc port is 27001.
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    vector.get_value('exec_option')['debug_action'] = \
+        _get_disk_fail_action(FAILED_KRPC_PORT)
+    vector.get_value('exec_option')['retry_failed_queries'] = "true"
+    coord_impalad = self.cluster.get_first_impalad()
+    client = coord_impalad.service.create_beeswax_client()
+
+    disk_failure_impalad = self.cluster.impalads[1]
+    assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
+
+    # Verify all nodes are active now.
+    backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
+    assert backends_json["num_active_backends"] == 3, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+
+    # Expect the impalad with disk failure is blacklisted, and query-retry is triggered
+    # and is completed successfully.
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+
+    # Validate the state of the web ui. The query must be closed before validating the
+    # state since it asserts that no queries are in flight.
+    client.close_query(handle)
+    client.close()
+    self.__validate_web_ui_state()
+
+    # Verify that the impalad with injected disk IO error is blacklisted.
+    backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
+    assert backends_json["num_blacklisted_backends"] == 1, backends_json
+    assert backends_json["num_active_backends"] == 2, backends_json
+    assert len(backends_json["backends"]) == 3, backends_json
+    num_blacklisted = 0
+    for backend_json in backends_json["backends"]:
+      if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
+        assert backend_json["is_blacklisted"], backend_json
+        assert "Query execution failure caused by local disk IO fatal error on backend" \
+            in backend_json["blacklist_cause"]
+        num_blacklisted += 1
+      else:
+        assert not backend_json["is_blacklisted"], backend_json
+    assert num_blacklisted == 1, backends_json
+
+    # Verify that the query is re-tried and finished.
+    completed_queries = coord_impalad.service.get_completed_queries()
+    # Assert that the most recently completed query is the retried query and it is marked
+    # as 'FINISHED.
+    assert completed_queries[0]['state'] == 'FINISHED'
+    assert completed_queries[0]["rows_fetched"] == 1500000
+    # Assert that the second most recently completed query is the original query and it
+    # is marked as 'RETRIED'.
+    assert completed_queries[1]['state'] == 'RETRIED'
+    assert completed_queries[1]["rows_fetched"] == 0
+    assert completed_queries[1]['query_id'] == handle.get_handle().id


[impala] 02/03: IMPALA-10472 flag for Kudu connection negotiation timeout

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f8ed3f67229c70dbc69501c72bdc146b88b55e4b
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Nov 9 09:54:02 2020 -0800

    IMPALA-10472 flag for Kudu connection negotiation timeout
    
    This patch adds --kudu_client_connection_negotiation_timeout_ms flag
    to control client-side connection negotiation timeout in the Kudu
    client working as a part of the Impala's BE.  Since [1] has been
    addressed for Kudu C++ client, it makes sense to provide a control knob
    to customize the timeout.  That should help to address cases where very
    busy cluster nodes hosting Kudu tablet servers aren't fast enough to
    negotiate a new connection within the default timeout interval (3 sec),
    as mentioned in the description of [1].
    
    [1] https://issues.apache.org/jira/browse/KUDU-2966
    
    Change-Id: I1223187318691da47082608356547f6d78144466
    Reviewed-on: http://gerrit.cloudera.org:8080/16705
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc | 7 +++++++
 be/src/exec/kudu-util.cc      | 3 +++
 2 files changed, 10 insertions(+)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 794c14d..73d2987 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -198,6 +198,13 @@ DEFINE_int32(kudu_client_rpc_timeout_ms, default_kudu_client_rpc_timeout_ms,
     "kudu_operation_timeout_ms. This must be a positive value or it will be ignored and "
     "Kudu's default of 10s will be used. There is no way to disable timeouts.");
 
+// Timeout for connection negotiation between Kudu client in the BE and Kudu
+// servers, in milliseconds. For details on connection negotiation, see
+// https://github.com/apache/kudu/blob/master/docs/design-docs/rpc.md#negotiation
+DEFINE_int32(kudu_client_connection_negotiation_timeout_ms, 3000,
+    "(Advanced) Timeout for connection negotiation between Kudu client and "
+    "Kudu masters and tablet servers, in milliseconds");
+
 DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "incremental stats the catalog is allowed to serialize per table. "
     "This limit is set as a safety check, to prevent the JVM from "
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index 499ac57..7d9bb34 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -46,6 +46,7 @@ using DataType = kudu::client::KuduColumnSchema::DataType;
 
 DECLARE_bool(disable_kudu);
 DECLARE_int32(kudu_client_rpc_timeout_ms);
+DECLARE_int32(kudu_client_connection_negotiation_timeout_ms);
 
 namespace impala {
 
@@ -80,6 +81,8 @@ Status CreateKuduClient(const vector<string>& master_addrs,
     b.default_rpc_timeout(
         kudu::MonoDelta::FromMilliseconds(FLAGS_kudu_client_rpc_timeout_ms));
   }
+  b.connection_negotiation_timeout(kudu::MonoDelta::FromMilliseconds(
+      FLAGS_kudu_client_connection_negotiation_timeout_ms));
   KUDU_RETURN_IF_ERROR(b.Build(client), "Unable to create Kudu client");
   return Status::OK();
 }


[impala] 03/03: IMPALA-10274: Initialize impala-python as part of the CMake build

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 60f8f87b09a27618df2ac73c1cc6dcd052f8c60d
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Sun Oct 18 17:20:52 2020 -0700

    IMPALA-10274: Initialize impala-python as part of the CMake build
    
    Initializing the impala-python virtualenv takes a couple minutes,
    so it is useful to do that in parallel to the rest of the build.
    This moves the impala-python initialization to its own step
    in the CMake build. It stops using impala-python for commands
    invoked from buildall.sh or the CMake build to avoid premature
    or concurrent initializations of impala-python. Then, it adds
    a dedicated step to initialize impala-python.
    
    Testing:
     - Ran a core job and a couple builds
     - Rebuilt and verified that impala-python is not reinitialized
       if it is already initialized
    
    Change-Id: Ieff51263c55bd234028fed7101c94b4a928590f0
    Reviewed-on: http://gerrit.cloudera.org:8080/16607
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                        |  6 +++++-
 be/src/codegen/gen_ir_descriptions.py |  4 +++-
 bin/gen_build_version.py              |  4 +++-
 bin/init-impala-python.sh             | 31 +++++++++++++++++++++++++++++++
 infra/python/bootstrap_virtualenv.py  |  3 ++-
 5 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 4bd9fb5..023cb9c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -448,8 +448,12 @@ add_custom_target(cscope ALL DEPENDS gen-deps
   COMMAND "${CMAKE_SOURCE_DIR}/bin/gen-cscope.sh"
 )
 
+add_custom_target(impala_python ALL
+  COMMAND "${CMAKE_SOURCE_DIR}/bin/init-impala-python.sh"
+)
+
 add_custom_target(notests_independent_targets DEPENDS
-  java cscope tarballs
+  java cscope tarballs impala_python
 )
 add_custom_target(notests_regular_targets DEPENDS
   impalad statestored catalogd admissiond fesupport loggingsupport ImpalaUdf udasample udfsample
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 6a383ad..6033470 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -1,4 +1,6 @@
-#!/usr/bin/env impala-python
+#!/usr/bin/env python
+# This uses system python to avoid a dependency on impala-python,
+# because this runs during the build.
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
diff --git a/bin/gen_build_version.py b/bin/gen_build_version.py
index 266fb78..4030d61 100755
--- a/bin/gen_build_version.py
+++ b/bin/gen_build_version.py
@@ -1,4 +1,6 @@
-#!/usr/bin/env impala-python
+#!/usr/bin/env python
+# This uses system python to avoid a dependency on impala-python,
+# because this runs during the build.
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
diff --git a/bin/init-impala-python.sh b/bin/init-impala-python.sh
new file mode 100755
index 0000000..e1e20f4
--- /dev/null
+++ b/bin/init-impala-python.sh
@@ -0,0 +1,31 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This is called during the build to initialize the impala-python
+# virtualenv (which involves installing various packages and
+# compiling things). This is not directly in CMake, because
+# this depends on knowing IMPALA_HOME and other environment
+# variables.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/impala-config.sh
+
+cd $IMPALA_HOME
+bin/impala-python -c 'print("Initialized impala-python")'
diff --git a/infra/python/bootstrap_virtualenv.py b/infra/python/bootstrap_virtualenv.py
index 7fbd276..e85a1e4 100644
--- a/infra/python/bootstrap_virtualenv.py
+++ b/infra/python/bootstrap_virtualenv.py
@@ -207,7 +207,8 @@ def download_toolchain_python():
         "$IMPALA_TOOLCHAIN_PACKAGES_HOME is set.")
 
   package = ToolchainPackage("python")
-  if not (os.environ.get(SKIP_TOOLCHAIN_BOOTSTRAP) == 'true'):
+  if package.needs_download() and \
+     not (os.environ.get(SKIP_TOOLCHAIN_BOOTSTRAP) == 'true'):
     package.download()
   python_cmd = os.path.join(package.pkg_directory(), "bin/python")
   if not os.path.exists(python_cmd):