You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2021/04/01 16:33:14 UTC

[impala] branch master updated (96e7f85 -> 8ac7613)

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 96e7f85  IMPALA-10596: De-flake teardown in TestAdmissionControllerStress
     new 559f604  IMPALA-9331: Add symptom for dataload failing on schema mismatch
     new 8ac7613  IMPALA-10591: Handle failed ReleaseQueryBackends rpcs

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/scheduling/admission-control-service.cc    |   5 +-
 be/src/scheduling/admission-controller.cc         | 106 ++++++++++++++--------
 be/src/scheduling/admission-controller.h          |  17 +++-
 testdata/bin/create-load-data.sh                  |   7 ++
 tests/custom_cluster/test_admission_controller.py |  31 +++++++
 5 files changed, 120 insertions(+), 46 deletions(-)

[impala] 02/02: IMPALA-10591: Handle failed ReleaseQueryBackends rpcs

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8ac761348aa805b53e0a13d85fd5c9b2de5f3985
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Mar 18 11:08:39 2021 -0700

    IMPALA-10591: Handle failed ReleaseQueryBackends rpcs
    
    When the admission control service is in use, coordinators will
    retry failed ReleaseQueryBackends rpcs 3 times before giving up.
    This can potentially result in resources not being released when they
    are no longer in use.
    
    This patch fixes the issue by automatically releasing all remaining
    backends when ReleaseQuery is called in the context of the admission
    control service (either as the result of a ReleaseQUery rpc or when
    cleaning up a query's resources with the heartbeat mechanism).
    
    Testing:
    - Added a custom cluster test that simulates failed
      ReleaseQueryBackends rpcs and ensures that query resources are
      eventually released.
    
    Change-Id: I22842b2fb8ee170b5e91f12cd83f57a5f5502ae9
    Reviewed-on: http://gerrit.cloudera.org:8080/17208
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
---
 be/src/scheduling/admission-control-service.cc    |   5 +-
 be/src/scheduling/admission-controller.cc         | 106 ++++++++++++++--------
 be/src/scheduling/admission-controller.h          |  17 +++-
 tests/custom_cluster/test_admission_controller.py |  31 +++++++
 4 files changed, 113 insertions(+), 46 deletions(-)

diff --git a/be/src/scheduling/admission-control-service.cc b/be/src/scheduling/admission-control-service.cc
index dcdf4e8..1f49f2f 100644
--- a/be/src/scheduling/admission-control-service.cc
+++ b/be/src/scheduling/admission-control-service.cc
@@ -216,8 +216,9 @@ void AdmissionControlService::ReleaseQuery(const ReleaseQueryRequestPB* req,
   {
     lock_guard<mutex> l(admission_state->lock);
     if (!admission_state->released) {
-      AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQuery(
-          req->query_id(), admission_state->coord_id, req->peak_mem_consumption());
+      AdmissiondEnv::GetInstance()->admission_controller()->ReleaseQuery(req->query_id(),
+          admission_state->coord_id, req->peak_mem_consumption(),
+          /* release_remaining_backends */ true);
       admission_state->released = true;
     } else {
       LOG(WARNING) << "Query " << req->query_id() << " was already released.";
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 208d9d2..6bb380d 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -728,21 +728,24 @@ void AdmissionController::PoolStats::Dequeue(bool timed_out) {
 }
 
 void AdmissionController::UpdateStatsOnReleaseForBackends(const UniqueIdPB& query_id,
-    const RunningQuery& running_query, const vector<NetworkAddressPB>& host_addrs) {
+    RunningQuery& running_query, const vector<NetworkAddressPB>& host_addrs) {
   int64_t total_mem_to_release = 0;
   for (auto host_addr : host_addrs) {
     auto backend_allocation = running_query.per_backend_resources.find(host_addr);
     if (backend_allocation == running_query.per_backend_resources.end()) {
+      // In the context of the admission control service, this may happen, eg. if a
+      // ReleaseQueryBackends rpc is delayed in the network and arrives after the
+      // ReleaseQuery rpc, so only log as a WARNING.
       string err_msg =
           strings::Substitute("Error: Cannot find exec params of host $0 for query $1.",
               NetworkAddressPBToString(host_addr), PrintId(query_id));
-      DCHECK(false) << err_msg;
-      LOG(ERROR) << err_msg;
+      LOG(WARNING) << err_msg;
       continue;
     }
     UpdateHostStats(host_addr, -backend_allocation->second.mem_to_admit, -1,
         -backend_allocation->second.slots_to_use);
     total_mem_to_release += backend_allocation->second.mem_to_admit;
+    running_query.per_backend_resources.erase(backend_allocation);
   }
   PoolStats* pool_stats = GetPoolStats(running_query.request_pool);
   pool_stats->ReleaseMem(total_mem_to_release);
@@ -1347,7 +1350,8 @@ Status AdmissionController::WaitOnQueued(const UniqueIdPB& query_id,
 }
 
 void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id,
-    const UniqueIdPB& coord_id, int64_t peak_mem_consumption) {
+    const UniqueIdPB& coord_id, int64_t peak_mem_consumption,
+    bool release_remaining_backends) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     auto host_it = running_queries_.find(coord_id);
@@ -1368,6 +1372,18 @@ void AdmissionController::ReleaseQuery(const UniqueIdPB& query_id,
     }
 
     const RunningQuery& running_query = it->second;
+    if (release_remaining_backends) {
+      vector<NetworkAddressPB> to_release;
+      for (const auto& entry : running_query.per_backend_resources) {
+        to_release.push_back(entry.first);
+      }
+      if (to_release.size() > 0) {
+        LOG(INFO) << "ReleaseQuery for " << query_id << " called with "
+                  << to_release.size()
+                  << "unreleased backends. Releasing automatically.";
+        ReleaseQueryBackendsLocked(query_id, coord_id, to_release);
+      }
+    }
     DCHECK_EQ(num_released_backends_.at(query_id), 0) << PrintId(query_id);
     num_released_backends_.erase(num_released_backends_.find(query_id));
     PoolStats* stats = GetPoolStats(running_query.request_pool);
@@ -1387,45 +1403,55 @@ void AdmissionController::ReleaseQueryBackends(const UniqueIdPB& query_id,
     const UniqueIdPB& coord_id, const vector<NetworkAddressPB>& host_addrs) {
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    auto host_it = running_queries_.find(coord_id);
-    if (host_it == running_queries_.end()) {
-      LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
-                  << " to get resources to release backends for query "
-                  << PrintId(query_id) << ", may have already been released.";
-      return;
-    }
-    auto it = host_it->second.find(query_id);
-    if (it == host_it->second.end()) {
-      LOG(DFATAL) << "Unable to find resources to release backends for query "
-                  << PrintId(query_id) << ", may have already been released.";
-      return;
-    }
+    ReleaseQueryBackendsLocked(query_id, coord_id, host_addrs);
+  }
+  dequeue_cv_.NotifyOne();
+}
 
-    const RunningQuery& running_query = it->second;
-    UpdateStatsOnReleaseForBackends(query_id, running_query, host_addrs);
+void AdmissionController::ReleaseQueryBackendsLocked(const UniqueIdPB& query_id,
+    const UniqueIdPB& coord_id, const vector<NetworkAddressPB>& host_addrs) {
+  auto host_it = running_queries_.find(coord_id);
+  if (host_it == running_queries_.end()) {
+    LOG(DFATAL) << "Unable to find host " << PrintId(coord_id)
+                << " to get resources to release backends for query "
+                << PrintId(query_id) << ", may have already been released.";
+    return;
+  }
+  auto it = host_it->second.find(query_id);
+  if (it == host_it->second.end()) {
+    // In the context of the admission control service, this may happen, eg. if a
+    // ReleaseQueryBackends rpc is delayed in the network and arrives after the
+    // ReleaseQuery rpc, so only log as a WARNING.
+    LOG(WARNING) << "Unable to find resources to release backends for query "
+                 << PrintId(query_id) << ", may have already been released.";
+    return;
+  }
 
-    // Update num_released_backends_.
-    auto released_backends = num_released_backends_.find(query_id);
-    if (released_backends != num_released_backends_.end()) {
-      released_backends->second -= host_addrs.size();
-    } else {
-      string err_msg = Substitute(
-          "Unable to find num released backends for query $0", PrintId(query_id));
-      DCHECK(false) << err_msg;
-      LOG(ERROR) << err_msg;
-    }
+  RunningQuery& running_query = it->second;
+  UpdateStatsOnReleaseForBackends(query_id, running_query, host_addrs);
 
-    if (VLOG_IS_ON(2)) {
-      stringstream ss;
-      ss << "Released query backend(s) ";
-      for (auto host_addr : host_addrs) ss << host_addr << " ";
-      ss << "for query id=" << PrintId(query_id) << " "
-         << GetPoolStats(running_query.request_pool)->DebugString();
-      VLOG(2) << ss.str();
-    }
-    pending_dequeue_ = true;
+  // Update num_released_backends_.
+  auto released_backends = num_released_backends_.find(query_id);
+  if (released_backends != num_released_backends_.end()) {
+    released_backends->second -= host_addrs.size();
+  } else {
+    // In the context of the admission control service, this may happen, eg. if a
+    // ReleaseQueryBackends rpc is delayed in the network and arrives after the
+    // ReleaseQuery rpc, so only log as a WARNING.
+    string err_msg = Substitute(
+        "Unable to find num released backends for query $0", PrintId(query_id));
+    LOG(WARNING) << err_msg;
   }
-  dequeue_cv_.NotifyOne();
+
+  if (VLOG_IS_ON(2)) {
+    stringstream ss;
+    ss << "Released query backend(s) ";
+    for (auto host_addr : host_addrs) ss << host_addr << " ";
+    ss << "for query id=" << PrintId(query_id) << " "
+       << GetPoolStats(running_query.request_pool)->DebugString();
+    VLOG(2) << ss.str();
+  }
+  pending_dequeue_ = true;
 }
 
 vector<UniqueIdPB> AdmissionController::CleanupQueriesForHost(
@@ -1454,7 +1480,7 @@ vector<UniqueIdPB> AdmissionController::CleanupQueriesForHost(
     LOG(INFO) << "Releasing resources for query " << PrintId(query_id)
               << " as it's coordinator " << PrintId(coord_id)
               << " reports that it is no longer registered.";
-    ReleaseQuery(query_id, coord_id, -1);
+    ReleaseQuery(query_id, coord_id, -1, /* release_remaining_backends */ true);
   }
   return to_clean_up;
 }
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index ed3f3ba..fe9dbc3 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -187,7 +187,8 @@ enum class AdmissionOutcome {
 ///   ids via a heartbeat rpc, allowing the admission contoller to clean up any queries
 ///   that are not in that list.
 /// - TODO(IMPALA-10594): handle the case of coordinators failing
-/// - TODO(IMPALA-10591): handle the case of a ReleaseQueryBackends rpc failing
+/// - RelaseQueryBackends rpc fails: when ReleaseQuery is eventually called (as guaranteed
+///   by the above), it will automatically release any remaining backends.
 ///
 /// Releasing Backends releases the admitted memory used by that Backend and decrements
 /// the number of running queries on the host running that Backend. Releasing a query does
@@ -381,9 +382,12 @@ class AdmissionController {
   /// been submitted via AdmitQuery(). 'query_id' is the completed query, 'coord_id' is
   /// the backend id of the coordinator for the query, and 'peak_mem_consumption' is the
   /// peak memory consumption of the query, which may be -1 if unavailable.
+  /// If 'release_remaining_backends' is true, calls ReleaseQueryBackends() for any
+  /// backends that have not been released yet. This is only used in the context of the
+  /// admission control service to account for the possibility of failed rpcs.
   /// This does not block.
   void ReleaseQuery(const UniqueIdPB& query_id, const UniqueIdPB& coord_id,
-      int64_t peak_mem_consumption);
+      int64_t peak_mem_consumption, bool release_remaining_backends = false);
 
   /// Updates the pool statistics when a Backend running a query completes (either
   /// successfully, is cancelled or failed). This should be called for all Backends part
@@ -847,7 +851,8 @@ class AdmissionController {
     /// The executor group this query was scheduled on.
     std::string executor_group;
 
-    /// Map from backend addresses to the resouces this query was allocated on them.
+    /// Map from backend addresses to the resouces this query was allocated on them. When
+    /// backends are released, they are removed from this map.
     std::unordered_map<NetworkAddressPB, BackendAllocation> per_backend_resources;
   };
 
@@ -988,7 +993,7 @@ class AdmissionController {
   /// specified in 'host_addrs'. Also updates the stats related to the admitted memory of
   /// its associated resource pool.
   void UpdateStatsOnReleaseForBackends(const UniqueIdPB& query_id,
-      const RunningQuery& running_query, const std::vector<NetworkAddressPB>& host_addrs);
+      RunningQuery& running_query, const std::vector<NetworkAddressPB>& host_addrs);
 
   /// Updates the memory admitted and the num of queries running on the specified host by
   /// adding the specified mem, num_queries and slots to the host stats.
@@ -1158,6 +1163,10 @@ class AdmissionController {
   void ReportTopNQueriesAtIndices(std::stringstream& ss, std::vector<Item>& listOfTopNs,
       std::vector<int>& indices, int indent, int64_t total_mem_consumed) const;
 
+  /// Performs the work of ReleaseQueryBackends(). 'admission_ctrl_lock_' must be held.
+  void ReleaseQueryBackendsLocked(const UniqueIdPB& query_id, const UniqueIdPB& coord_id,
+      const vector<NetworkAddressPB>& host_addr);
+
   FRIEND_TEST(AdmissionControllerTest, Simple);
   FRIEND_TEST(AdmissionControllerTest, PoolStats);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 1f21f7b..c268d98 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -1425,6 +1425,37 @@ class TestAdmissionControllerWithACService(TestAdmissionController):
     self.wait_for_state(
         handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--vmodule admission-controller=3 "
+      "--debug_actions=IMPALA_SERVICE_POOL:127.0.0.1:29500:ReleaseQueryBackends:FAIL@1.0 "
+      "--admission_control_slots=1 --executor_groups=default-pool-group1")
+  def test_release_query_backends_failed(self):
+    """Tests that if the ReleaseQueryBackends rpc fails, the query's resources will
+    eventually be cleaned up. Uses the --debug_action flag to simulate rpc failures, and
+    sets the number of slots for a single pool as slot usage per executor is decremented
+    when releasing individual backends."""
+    # Query designed to run for a few minutes.
+    query = "select count(*) from functional.alltypes where int_col = sleep(10000)"
+    handle1 = self.execute_query_async(query)
+    timeout_s = 10
+    # Make sure the first query has been admitted.
+    self.wait_for_state(
+        handle1, self.client.QUERY_STATES['RUNNING'], timeout_s)
+
+    # Run another query. This query should be queued because the executor group only has 1
+    # slot.
+    handle2 = self.execute_query_async(query)
+    self._wait_for_change_to_profile(handle2, "Admission result: Queued")
+
+    # Cancel the first query. It's resources should be released and the second query
+    # should be admitted.
+    self.client.cancel(handle1)
+    self.client.close_query(handle1)
+    self.wait_for_state(
+        handle2, self.client.QUERY_STATES['RUNNING'], timeout_s)
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin

[impala] 01/02: IMPALA-9331: Add symptom for dataload failing on schema mismatch

Posted by tm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 559f6044bed1c0bcfe0312b2eb2c349450628e76
Author: Laszlo Gaal <la...@cloudera.com>
AuthorDate: Mon Mar 29 22:36:14 2021 +0200

    IMPALA-9331: Add symptom for dataload failing on schema mismatch
    
    Streamline triaging a bit. When this fails, it does so in a specific
    location, and until now you had to scan the build log to find the
    problem. This JUnitXML symptom should make this failure mode obvious.
    
    Tested by running an S3 build on private infrastructure with a knowingly
    mismatched data snapshot.
    
    Change-Id: I2fa193740a2764fdda799d6a9cc64f89cab64aba
    Reviewed-on: http://gerrit.cloudera.org:8080/17242
    Reviewed-by: Laszlo Gaal <la...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 testdata/bin/create-load-data.sh | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 3cf6973..bed5c89 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -118,6 +118,10 @@ fi
     -script_name "$(basename $0)" &
 TIMEOUT_PID=$!
 
+SCHEMA_MISMATCH_ERROR="A schema change has been detected in the metadata, "
+SCHEMA_MISMATCH_ERROR+="but it cannot be loaded on Isilon, s3, gcs or local filesystem, "
+SCHEMA_MISMATCH_ERROR+="and the filesystem is ${TARGET_FILESYSTEM}".
+
 if [[ $SKIP_METADATA_LOAD -eq 0  && "$SNAPSHOT_FILE" = "" ]]; then
   run-step "Generating HBase data" create-hbase.log \
       ${IMPALA_HOME}/testdata/bin/create-hbase.sh
@@ -134,6 +138,9 @@ elif [ $SKIP_SNAPSHOT_LOAD -eq 0 ]; then
       echo "ERROR in $0 at line $LINENO: A schema change has been detected in the"
       echo "metadata, but it cannot be loaded on isilon, s3, gcs or local and the"
       echo "target file system is ${TARGET_FILESYSTEM}.  Exiting."
+      # Generate an explicit JUnitXML symptom report here for easier triaging
+      ${IMPALA_HOME}/bin/generate_junitxml.py --phase=dataload \
+          --step=check-schema-diff.sh --error "${SCHEMA_MISMATCH_ERROR}"
       exit 1
     fi
     echo "Schema change detected, metadata will be loaded."