You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/12/07 22:16:35 UTC

[2/2] impala git commit: IMPALA-6081: Fix test_basic_filters runtime profile failure

IMPALA-6081: Fix test_basic_filters runtime profile failure

test_basic_filters has been occasionally failing due to a line missing
from a runtime profile for a particular query.

The problem is that the query returns all of its results before all of
its fragment instances are finished executing (due to a limit). Then,
when one fragment instance reports its status, the coordinator returns
to it a 'cancelled' status, causing all remaining instances for that
backend to be cancelled.

Sometimes this cancellation happens quickly enough that the relevant
fragment instances have not yet sent a status report when they are
cancelled. They will still send a report in finalize, but as the
coordinator only updates its runtime profile for 'ok' status reports,
not 'cancelled', the final runtime profile doesn't end up with any
data for those fragment instances, which means the test does not find
the line in the runtime profile its checking for.

The fix is to have the coordinator update its runtime profile with
every status report it recieves, regardless of error status.

Testing:
- Ran existing runtime profile tests, which rely on profile output,
  in a loop.
- Manually tested some scenarios with failed queries and checked that
  the new profile output is reasonable.
- Added a new e2e test that runs the affected query and checks for the
  presence of info for all expected exec node in the profile. This
  repros the underlying issue consistently.

Change-Id: I4f581c7c8039f02a33712515c5bffab942309bba
Reviewed-on: http://gerrit.cloudera.org:8080/8754
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f3fa3e01
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f3fa3e01
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f3fa3e01

Branch: refs/heads/master
Commit: f3fa3e017f1341579d008619f57cc5dceb21603d
Parents: 82267a2
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Nov 29 16:46:38 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 7 21:07:02 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc | 32 +++++++++++-------------
 be/src/runtime/query-state.cc               |  7 +++---
 tests/query_test/test_observability.py      | 19 ++++++++++++++
 3 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 12689e0..973aa25 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -256,22 +256,21 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
         instance_exec_status.fragment_instance_id);
     // Ignore duplicate or out-of-order messages.
     if (instance_stats->done_) continue;
-    if (instance_status.ok()) {
-      instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
-      if (instance_stats->peak_mem_counter_ != nullptr) {
-        // protect against out-of-order status updates
-        peak_consumption_ =
-            max(peak_consumption_, instance_stats->peak_mem_counter_->value());
-      }
-    } else {
-      // if a query is aborted due to an error encountered by a single fragment instance,
-      // all other fragment instances will report a cancelled status; make sure not
-      // to mask the original error status
-      if (status_.ok() || status_.IsCancelled()) {
-        status_ = instance_status;
-        failed_instance_id_ = instance_exec_status.fragment_instance_id;
-        is_fragment_failure_ = true;
-      }
+
+    instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
+    if (instance_stats->peak_mem_counter_ != nullptr) {
+      // protect against out-of-order status updates
+      peak_consumption_ =
+        max(peak_consumption_, instance_stats->peak_mem_counter_->value());
+    }
+
+    // If a query is aborted due to an error encountered by a single fragment instance,
+    // all other fragment instances will report a cancelled status; make sure not to mask
+    // the original error status.
+    if (!instance_status.ok() && (status_.ok() || status_.IsCancelled())) {
+      status_ = instance_status;
+      failed_instance_id_ = instance_exec_status.fragment_instance_id;
+      is_fragment_failure_ = true;
     }
     DCHECK_GT(num_remaining_instances_, 0);
     if (instance_exec_status.done) {
@@ -454,7 +453,6 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
 void Coordinator::BackendState::InstanceStats::Update(
     const TFragmentInstanceExecStatus& exec_status,
     ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
-  DCHECK(Status(exec_status.status).ok());
   if (exec_status.done) stopwatch_.Stop();
   profile_->Update(exec_status.profile);
   if (!profile_created_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3b168ca..ae207a2 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -227,10 +227,9 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
     status.SetTStatus(&instance_status);
     instance_status.__set_done(done);
 
-    if (fis->profile() != nullptr) {
-      fis->profile()->ToThrift(&instance_status.profile);
-      instance_status.__isset.profile = true;
-    }
+    DCHECK(fis->profile() != nullptr);
+    fis->profile()->ToThrift(&instance_status.profile);
+    instance_status.__isset.profile = true;
 
     // Only send updates to insert status if fragment is finished, the coordinator waits
     // until query execution is done to use them anyhow.

http://git-wip-us.apache.org/repos/asf/impala/blob/f3fa3e01/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index cf0527b..ee7177d 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -108,3 +108,22 @@ class TestObservability(ImpalaTestSuite):
     assert "Query Options (set by configuration and planner): MEM_LIMIT=8589934592," \
         "NUM_NODES=1,NUM_SCANNER_THREADS=1,RUNTIME_FILTER_MODE=0,MT_DOP=0\n" \
         in runtime_profile
+
+  def test_profile_fragment_instances(self):
+    """IMPALA-6081: Test that the expected number of fragment instances and their exec
+    nodes appear in the runtime profile, even when fragments may be quickly cancelled when
+    all results are already returned."""
+    results = self.execute_query("""
+        with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
+        select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
+        join (select * from l LIMIT 2000000) b on a.l_orderkey = -b.l_orderkey;""")
+    # There are 3 scan nodes and each appears in the profile 4 times (for 3 fragment
+    # instances + the averaged fragment).
+    assert results.runtime_profile.count("HDFS_SCAN_NODE") == 12
+    # There are 3 exchange nodes and each appears in the profile 2 times (for 1 fragment
+    # instance + the averaged fragment).
+    assert results.runtime_profile.count("EXCHANGE_NODE") == 6
+    # The following appear only in the root fragment which has 1 instance.
+    assert results.runtime_profile.count("HASH_JOIN_NODE") == 2
+    assert results.runtime_profile.count("AGGREGATION_NODE") == 2
+    assert results.runtime_profile.count("PLAN_ROOT_SINK") == 2