You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/02 23:31:26 UTC

[4/4] impala git commit: IMPALA-6338: Fix flaky test_profile_fragment_instances

IMPALA-6338: Fix flaky test_profile_fragment_instances

test_profile_fragment_instances checks that, once all the results have
been returned, every fragment instance appears in the query profile
for a query that internally cancels fragment instances that are still
executing when the results have been fully returned.

Every fis is guaranteed to send a profile to the coordinator in
Finalize(), but previously fragment profiles were not applied by the
coordinator if the backend was 'done', defined as either all instances
having completed or one has entered an error state (including
cancelled).

So, the test could fail by the following sequence:
- Some fragment for a particular backend sends an update to the
  coordinator. 'returned_all_results_' is true, so the coordinator
  responds indicating the the backend should cancel its remaining
  fragments.
- Another fragment from that backend executes Finalize() and reports
  that it was cancelled. This causes the coordinator to consider the
  entire backend to be 'done'.
- A third fragment, which had not previously sent a report from the
  reporting thread, from the same backend executes Finalize(). This
  report will not be applied by the coordinator as the backend is
  considered 'done', so this fragment will not appear in the final
  profile.

The solution is to change the definition of 'done' to not include a
backend that has been cancelled but still has fragments that haven't
completed. This guarantees that for queries that complete successfully
and are cancelled internally, all fis will send a report and have it
applied by the coordinator before all results have been returned,
since if eos is true Coordinator::GetNext() calls
WaitForBackendCompletion(), which in this situation will now wait for
all fis to Finalize().

Returning results for queries that are cancelled by the user is
unaffected as the manual cancel path causes WaitForBackendCompletion().

Testing:
- Ran test_profile_fragment_instances in a loop with no failures.
  I can reliably repro the original problem with a few carefully
  placed sleeps.

Change-Id: I77773a1e3c62952003f37f88fe2b662bb11889ed
Reviewed-on: http://gerrit.cloudera.org:8080/8997
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a018038d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a018038d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a018038d

Branch: refs/heads/master
Commit: a018038df5b13f24f7980b75d755e0123ae2687d
Parents: c2184e5
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Tue Jan 9 13:59:52 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 23:22:51 2018 +0000

----------------------------------------------------------------------
 be/src/common/status.h                      |  6 ++++++
 be/src/runtime/coordinator-backend-state.cc | 12 ++++++------
 be/src/runtime/coordinator-backend-state.h  |  6 ++++--
 be/src/runtime/coordinator.cc               | 10 ++++++----
 be/src/runtime/coordinator.h                |  4 +++-
 tests/query_test/test_observability.py      | 10 +++++-----
 6 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 24dba8b..f0f91f7 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -95,7 +95,13 @@ class NODISCARD Status {
   static Status MemLimitExceeded();
   static Status MemLimitExceeded(const std::string& details);
 
+  /// Indicates a 'cancelled' status. CANCELLED should not be reported by a fragment
+  /// instance that encounters a problem - instances should return a specific error,
+  /// and then the coordinator will initiate cancellation.
+  /// TODO: we use this in some places to indicate things other than query cancellation,
+  /// which can be confusing.
   static const Status CANCELLED;
+
   static const Status DEPRECATED_RPC;
 
   /// Copy c'tor makes copy of error detail so Status can be returned by value.

http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 914a3e4..b238cad 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -234,7 +234,7 @@ void Coordinator::BackendState::LogFirstInProgress(
 }
 
 inline bool Coordinator::BackendState::IsDone() const {
-  return num_remaining_instances_ == 0 || !status_.ok();
+  return num_remaining_instances_ == 0 || (!status_.ok() && !status_.IsCancelled());
 }
 
 bool Coordinator::BackendState::ApplyExecStatusReport(
@@ -338,8 +338,8 @@ bool Coordinator::BackendState::Cancel() {
   // Nothing to cancel if the exec rpc was not sent
   if (!rpc_sent_) return false;
 
-  // don't cancel if it already finished (for any reason)
-  if (IsDone()) return false;
+  // don't cancel if it already finished (for any reason) or cancelled
+  if (IsDone() || status_.IsCancelled()) return false;
 
   /// If the status is not OK, we still try to cancel - !OK status might mean
   /// communication failure between backend and coordinator, but fragment
@@ -391,10 +391,10 @@ bool Coordinator::BackendState::Cancel() {
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
   DCHECK_EQ(rpc_params.dst_query_id, query_id_);
   {
-    // If the backend is already done, it's not waiting for this filter, so we skip
-    // sending it in this case.
+    // If the backend is already done or cancelled, it's not waiting for this filter, so
+    // we skip sending it in this case.
     lock_guard<mutex> l(lock_);
-    if (IsDone()) return;
+    if (IsDone() || status_.IsCancelled()) return;
   }
 
   if (fragments_.count(rpc_params.dst_fragment_idx) == 0) return;

http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 0973ca3..860b968 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -219,7 +219,7 @@ class Coordinator::BackendState {
 
   /// If the status indicates an error status, execution has either been aborted by the
   /// executing impalad (which then reported the error) or cancellation has been
-  /// initiated; either way, execution must not be cancelled.
+  /// initiated by the coordinator.
   Status status_;
 
   /// Used to distinguish between errors reported by a specific fragment instance,
@@ -254,7 +254,9 @@ class Coordinator::BackendState {
       const FilterRoutingTable& filter_routing_table,
       TExecQueryFInstancesParams* rpc_params);
 
-  /// Return true if execution at this backend is done. Caller must hold lock_.
+  /// Return true if execution at this backend is done. The backend is considered done if
+  /// either all instances have completed, or an error (other than cancel) is encountered.
+  /// Caller must hold lock_.
   bool IsDone() const;
 };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7973775..05ecf9f 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -867,6 +867,9 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
     ReleaseExecResources();
     // wait for all backends to complete before computing the summary
     // TODO: relocate this so GetNext() won't have to wait for backends to complete?
+    // Note that doing this here allows us to ensure that a query that completes
+    // successfully will have a full runtime profile by the time that Fetch() indicates
+    // all of the results have been returned.
     RETURN_IF_ERROR(WaitForBackendCompletion());
     // Release admission control resources after backends are finished.
     ReleaseAdmissionControlResources();
@@ -920,10 +923,8 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
         Substitute("Unknown backend index $0 (max known: $1)",
             params.coord_state_idx, backend_states_.size() - 1));
   }
-  BackendState* backend_state = backend_states_[params.coord_state_idx];
-  // TODO: return here if returned_all_results_?
-  // TODO: return CANCELLED in that case? Although that makes the cancellation propagation
-  // path more irregular.
+  // If the query was cancelled, don't process the update.
+  if (query_status_.IsCancelled()) return Status::OK();
 
   // TODO: only do this when the sink is done; probably missing a done field
   // in TReportExecStatus for that
@@ -931,6 +932,7 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
     UpdateInsertExecStatus(params.insert_exec_status);
   }
 
+  BackendState* backend_state = backend_states_[params.coord_state_idx];
   if (backend_state->ApplyExecStatusReport(params, &exec_summary_, &progress_)) {
     // This report made this backend done, so update the status and
     // num_remaining_backends_.

http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d630b9a..fbbdfa9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -290,7 +290,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   boost::mutex lock_;
 
   /// Overall status of the entire query; set to the first reported fragment error
-  /// status or to CANCELLED, if Cancel() is called.
+  /// status or to CANCELLED, if Cancel() is called. Note that some fragments may have
+  /// status CANCELLED even if this is not CANCELLED if cancellation is initiated because
+  /// returned_all_results_ is true or an error is encountered.
   Status query_status_;
 
   /// If true, the query is done returning all results.  It is possible that the

http://git-wip-us.apache.org/repos/asf/impala/blob/a018038d/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index e838081..960a6f4 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -124,14 +124,14 @@ class TestObservability(ImpalaTestSuite):
         join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""")
     # There are 3 scan nodes and each appears in the profile 4 times (for 3 fragment
     # instances + the averaged fragment).
-    assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12
+    assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12, results.runtime_profile
     # There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment
     # instance + the averaged fragment).
-    assert results.runtime_profile.count("EXCHANGE_NODE") == 6
+    assert results.runtime_profile.count("EXCHANGE_NODE") == 6, results.runtime_profile
     # The following appear only in the root fragment which has 1 instance.
-    assert results.runtime_profile.count("HASH_JOIN_NODE") == 2
-    assert results.runtime_profile.count("AGGREGATION_NODE") == 2
-    assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2
+    assert results.runtime_profile.count("HASH_JOIN_NODE") == 2, results.runtime_profile
+    assert results.runtime_profile.count("AGGREGATION_NODE") == 2, results.runtime_profile
+    assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2, results.runtime_profile
 
   # IMPALA-6399: Run this test serially to avoid a delay over the wait time in fetching
   # the profile.