You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/11/06 01:05:29 UTC

[2/2] impala git commit: IMPALA-4063: Merge report of query fragment instances per executor

IMPALA-4063: Merge report of query fragment instances per executor

Previously, each fragment instance executing on an executor will
independently report its status to the coordinator periodically.
This creates a huge amount of RPCs to the coordinator under highly
concurrent workloads, causing lock contention in the coordinator's
backend states when multiple fragment instances send them at the
same time. In addition, due to the lack of coordination between query
fragment instances, a query may end without collecting the profiles
from all fragment instances when one of them hits an error before
another fragment instance manages to finish Prepare(), leading to
missing profiles for certain fragment instances.

This change fixes the problem above by making a thread per QueryState
(started by QueryExecMgr) to be responsible for periodically reporting
the status and profiles of all fragment instances of a query running
on a backend. As part of this refactoring, each query fragment instance
will not report their errors individually. Instead, there is a cumulative
status maintained per QueryState. It's set to the error status of the first
fragment instance which hits an error or any general error (e.g. failure
to start a thread) when starting fragment instances. With this change,
the status reporting threads are also removed.

Testing done: exhaustive tests

This patch is based on a patch by Sailesh Mukil

Change-Id: I5f95e026ba05631f33f48ce32da6db39c6f421fa
Reviewed-on: http://gerrit.cloudera.org:8080/11615
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/94103822
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/94103822
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/94103822

Branch: refs/heads/master
Commit: 941038229ae7073ddf7b9c6f58e9eaf866b89b2c
Parents: f7d89ef
Author: Michael Ho <kw...@cloudera.com>
Authored: Tue May 22 16:38:03 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Nov 6 01:01:07 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc     |  32 +-
 be/src/runtime/coordinator-backend-state.h      |   2 +-
 be/src/runtime/coordinator.cc                   |   4 +-
 be/src/runtime/coordinator.h                    |   6 +-
 be/src/runtime/fragment-instance-state.cc       | 127 +++-----
 be/src/runtime/fragment-instance-state.h        |  74 ++---
 be/src/runtime/query-exec-mgr.cc                |   9 +-
 be/src/runtime/query-exec-mgr.h                 |   4 +-
 be/src/runtime/query-state.cc                   | 290 ++++++++++---------
 be/src/runtime/query-state.h                    | 187 ++++++------
 be/src/service/client-request-state.cc          |   5 +-
 be/src/service/client-request-state.h           |   2 +-
 be/src/service/control-service.cc               |  22 +-
 be/src/service/control-service.h                |   4 +-
 common/protobuf/control_service.proto           |  30 +-
 common/thrift/RuntimeProfile.thrift             |   5 +
 .../queries/QueryTest/bloom_filters.test        |  28 +-
 .../queries/QueryTest/udf-no-expr-rewrite.test  |  30 ++
 .../QueryTest/udf-non-deterministic.test        |  17 --
 .../functional-query/queries/QueryTest/udf.test |  10 -
 tests/query_test/test_observability.py          |   1 -
 tests/query_test/test_udfs.py                   |   7 +-
 22 files changed, 422 insertions(+), 474 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 4d5ccbd..a5f5f1e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -270,7 +270,7 @@ inline bool Coordinator::BackendState::IsDone() const {
 
 bool Coordinator::BackendState::ApplyExecStatusReport(
     const ReportExecStatusRequestPB& backend_exec_status,
-    const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
+    const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
   // Hold the exec_summary's lock to avoid exposing it half-way through
   // the update loop below.
@@ -280,6 +280,10 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
 
   // If this backend completed previously, don't apply the update.
   if (IsDone()) return false;
+
+  int idx = 0;
+  const bool has_profile = thrift_profiles.profile_trees.size() > 0;
+  TRuntimeProfileTree empty_profile;
   for (const FragmentInstanceExecStatusPB& instance_exec_status :
            backend_exec_status.instance_exec_status()) {
     int64_t report_seq_no = instance_exec_status.report_seq_no();
@@ -296,8 +300,11 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     }
 
     DCHECK(!instance_stats->done_);
-    instance_stats->Update(
-        instance_exec_status, thrift_profile, exec_summary, scan_range_progress);
+    DCHECK(!has_profile || idx < thrift_profiles.profile_trees.size());
+    const TRuntimeProfileTree& profile =
+        has_profile ? thrift_profiles.profile_trees[idx++] : empty_profile;
+    instance_stats->Update(instance_exec_status, profile, exec_summary,
+        scan_range_progress);
 
     // Update DML stats
     if (instance_exec_status.has_dml_exec_status()) {
@@ -313,15 +320,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
           PrintErrorMapToString(error_log_);
     }
 
-    // If a query is aborted due to an error encountered by a single fragment instance,
-    // all other fragment instances will report a cancelled status; make sure not to mask
-    // the original error status.
-    const Status instance_status(instance_exec_status.status());
-    if (!instance_status.ok() && (status_.ok() || status_.IsCancelled())) {
-      status_ = instance_status;
-      failed_instance_id_ = ProtoToQueryId(instance_exec_status.fragment_instance_id());
-      is_fragment_failure_ = true;
-    }
     DCHECK_GT(num_remaining_instances_, 0);
     if (instance_exec_status.done()) {
       DCHECK(!instance_stats->done_);
@@ -347,9 +345,13 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
   // status_ has incorporated the status from all fragment instances. If the overall
   // backend status is not OK, but no specific fragment instance reported an error, then
   // this is a general backend error. Incorporate the general error into status_.
-  Status overall_backend_status(backend_exec_status.status());
-  if (!overall_backend_status.ok() && (status_.ok() || status_.IsCancelled())) {
-    status_ = overall_backend_status;
+  Status overall_status(backend_exec_status.overall_status());
+  if (!overall_status.ok() && (status_.ok() || status_.IsCancelled())) {
+    status_ = overall_status;
+    if (backend_exec_status.has_fragment_instance_id()) {
+      failed_instance_id_ = ProtoToQueryId(backend_exec_status.fragment_instance_id());
+      is_fragment_failure_ = true;
+    }
   }
 
   // TODO: keep backend-wide stopwatch?

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 6bc4c67..1eb220d 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -85,7 +85,7 @@ class Coordinator::BackendState {
   /// IsDone() from false to true, either because it was the last fragment to complete or
   /// because it was the first error received.
   bool ApplyExecStatusReport(const ReportExecStatusRequestPB& backend_exec_status,
-      const TRuntimeProfileTree& thrift_profile, ExecSummary* exec_summary,
+      const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
       ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state);
 
   /// Update completion_times, rates, and avg_profile for all fragment_stats.

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 47be2e2..9dfb615 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -661,7 +661,7 @@ void Coordinator::CancelBackends() {
 }
 
 Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
-    const TRuntimeProfileTree& thrift_profile) {
+    const TRuntimeProfileForest& thrift_profiles) {
   const int32_t coord_state_idx = request.coord_state_idx();
   VLOG_FILE << "UpdateBackendExecStatus() query_id=" << PrintId(query_id())
             << " backend_idx=" << coord_state_idx;
@@ -673,7 +673,7 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
   }
   BackendState* backend_state = backend_states_[coord_state_idx];
 
-  if (backend_state->ApplyExecStatusReport(request, thrift_profile, &exec_summary_,
+  if (backend_state->ApplyExecStatusReport(request, thrift_profiles, &exec_summary_,
           &progress_, &dml_exec_state_)) {
     // This backend execution has completed.
     if (VLOG_QUERY_IS_ON) {

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index a3172ba..546efd5 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -128,10 +128,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Called by the report status RPC handler to update execution status of a particular
   /// backend as well as dml_exec_state_ and the profile. This may block if exec RPCs are
-  /// pending. 'request' contains details of the status update. 'thrift_profile' is the
-  /// Thrift runtime profile from the backend.
+  /// pending. 'request' contains details of the status update. 'thrift_profiles' contains
+  /// Thrift runtime profiles of all fragment instances from the backend.
   Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
-      const TRuntimeProfileTree& thrift_profile) WARN_UNUSED_RESULT;
+      const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
 
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index d2e8359..c82470e 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -49,9 +49,6 @@
 #include "util/periodic-counter-updater.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 
-DEFINE_int32(status_report_interval_ms, 5000,
-    "interval between profile reports; in milliseconds");
-
 using namespace impala;
 using namespace apache::thrift;
 
@@ -96,24 +93,26 @@ Status FragmentInstanceState::Exec() {
     status = ExecInternal();
   }
 
-  if (!status.ok()) goto done;
-  // Tell the managing 'QueryState' that we're done with executing and that we've stopped
-  // the reporting thread.
-  query_state_->DoneExecuting();
-
 done:
+  // Must update the fragment instance state first before updating the 'Query State'.
+  // Otherwise, there is a race when reading the 'done' flag with GetStatusReport().
+  // This may lead to the "final" profile being sent with the 'done' flag as false.
+  DCHECK_EQ(is_prepared,
+      current_state_.Load() > FInstanceExecStatePB::WAITING_FOR_PREPARE);
+  UpdateState(StateEvent::EXEC_END);
+
   if (!status.ok()) {
     if (!is_prepared) {
-      DCHECK_LE(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
       // Tell the managing 'QueryState' that we hit an error during Prepare().
       query_state_->ErrorDuringPrepare(status, instance_id());
     } else {
-      DCHECK_GT(current_state_.Load(), FInstanceExecStatePB::WAITING_FOR_PREPARE);
       // Tell the managing 'QueryState' that we hit an error during execution.
       query_state_->ErrorDuringExecute(status, instance_id());
     }
+  } else {
+    // Tell the managing 'QueryState' that we're done with executing.
+    query_state_->DoneExecuting();
   }
-  UpdateState(StateEvent::EXEC_END);
   // call this before Close() to make sure the thread token got released
   Finalize(status);
   Close();
@@ -237,20 +236,33 @@ Status FragmentInstanceState::Prepare() {
       new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
         runtime_state_->instance_mem_tracker()));
   VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
+  return Status::OK();
+}
 
-  // We need to start the profile-reporting thread before calling Open(),
-  // since it may block.
-  if (FLAGS_status_report_interval_ms > 0) {
-    string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id()));
-    unique_lock<mutex> l(report_thread_lock_);
-    RETURN_IF_ERROR(Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
-        thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
-    // Make sure the thread started up, otherwise ReportProfileThread() might get into
-    // a race with StopReportThread().
-    while (!report_thread_active_) report_thread_started_cv_.Wait(l);
+void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instance_status,
+    TRuntimeProfileTree* thrift_profile) {
+  DFAKE_SCOPED_LOCK(report_status_lock_);
+  DCHECK(!final_report_sent_);
+  // Update the counter for the peak per host mem usage.
+  if (per_host_mem_usage_ != nullptr) {
+    per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
   }
-
-  return Status::OK();
+  instance_status->set_report_seq_no(AdvanceReportSeqNo());
+  const TUniqueId& finstance_id = instance_id();
+  TUniqueIdToUniqueIdPB(finstance_id, instance_status->mutable_fragment_instance_id());
+  const bool done = IsDone();
+  instance_status->set_done(done);
+  instance_status->set_current_state(current_state());
+  DCHECK(profile() != nullptr);
+  profile()->ToThrift(thrift_profile);
+  // Send the DML stats if this is the final report.
+  if (done) {
+    runtime_state()->dml_exec_state()->ToProto(
+        instance_status->mutable_dml_exec_status());
+    final_report_sent_ = true;
+  }
+  // Send new errors to coordinator.
+  runtime_state()->GetUnreportedErrors(instance_status->mutable_error_log());
 }
 
 Status FragmentInstanceState::Open() {
@@ -319,15 +331,12 @@ Status FragmentInstanceState::ExecInternal() {
   } while (!exec_tree_complete);
 
   UpdateState(StateEvent::LAST_BATCH_SENT);
-  // Flush the sink *before* stopping the report thread. Flush may need to add some
-  // important information to the last report that gets sent. (e.g. table sinks record the
-  // files they have written to in this method)
+  // Flush the sink as a final step.
   RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
   return Status::OK();
 }
 
 void FragmentInstanceState::Close() {
-  DCHECK(!report_thread_active_);
   DCHECK(runtime_state_ != nullptr);
 
   // guard against partially-finished Prepare()
@@ -362,52 +371,6 @@ void FragmentInstanceState::Close() {
 #endif
 }
 
-void FragmentInstanceState::ReportProfileThread() {
-  VLOG_FILE << "ReportProfileThread(): instance_id=" << PrintId(instance_id());
-  unique_lock<mutex> l(report_thread_lock_);
-  // tell Prepare() that we started
-  report_thread_active_ = true;
-  report_thread_started_cv_.NotifyOne();
-
-  // Jitter the reporting time of remote fragments by a random amount between
-  // 0 and the report_interval.  This way, the coordinator doesn't get all the
-  // updates at once so its better for contention as well as smoother progress
-  // reporting.
-  int report_fragment_offset = rand() % FLAGS_status_report_interval_ms;
-  // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.WaitFor(l, report_fragment_offset * MICROS_PER_MILLI);
-
-  while (report_thread_active_) {
-    // timed_wait can return because the timeout occurred or the condition variable
-    // was signaled.  We can't rely on its return value to distinguish between the
-    // two cases (e.g. there is a race here where the wait timed out but before grabbing
-    // the lock, the condition variable was signaled).  Instead, we will use an external
-    // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.WaitFor(l, FLAGS_status_report_interval_ms * MICROS_PER_MILLI);
-
-    if (!report_thread_active_) break;
-    SendReport(false, Status::OK());
-  }
-
-  VLOG_FILE << "exiting reporting thread: instance_id=" << PrintId(instance_id());
-}
-
-void FragmentInstanceState::SendReport(bool done, const Status& status) {
-  DFAKE_SCOPED_LOCK(report_status_lock_);
-  DCHECK(status.ok() || done);
-  DCHECK(runtime_state_ != nullptr);
-
-  VLOG_FILE << "Reporting " << (done ? "final " : "") << "profile for instance "
-      << PrintId(runtime_state_->fragment_instance_id());
-
-  // Update the counter for the peak per host mem usage.
-  if (per_host_mem_usage_ != nullptr) {
-    per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
-  }
-
-  query_state_->ReportExecStatus(done, status, this);
-}
-
 void FragmentInstanceState::UpdateState(const StateEvent event)
 {
   FInstanceExecStatePB current_state = current_state_.Load();
@@ -472,30 +435,16 @@ void FragmentInstanceState::UpdateState(const StateEvent event)
       DCHECK(false) << "Unexpected Event: " << static_cast<int>(event);
       break;
   }
-  // current_state_ is an AtomicEnum to add memory barriers for concurrent reads by the
-  // profile reporting thread. This method is the only one updating it and is not
-  // meant to be thread safe.
+  // This method is the only one updating 'current_state_' and is not meant to be thread
+  // safe.
   if (next_state != current_state) current_state_.Store(next_state);
 }
 
-void FragmentInstanceState::StopReportThread() {
-  if (!report_thread_active_) return;
-  {
-    lock_guard<mutex> l(report_thread_lock_);
-    report_thread_active_ = false;
-  }
-  stop_report_thread_cv_.NotifyOne();
-  report_thread_->Join();
-}
-
 void FragmentInstanceState::Finalize(const Status& status) {
   if (fragment_ctx_.fragment.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) {
     // if we haven't already release this thread token in Prepare(), release it now
     ReleaseThreadToken();
   }
-  StopReportThread();
-  // It's safe to send final report now that the reporting thread is stopped.
-  SendReport(true, status);
 }
 
 void FragmentInstanceState::ReleaseThreadToken() {

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index e4ebdce..7636055 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -60,15 +60,6 @@ class RuntimeState;
 /// The FIS makes an aggregated profile for the entire fragment available, which
 /// includes profile information for the plan itself as well as the output sink. It also
 /// contains a timeline of events of the fragment instance.
-/// The FIS periodically makes a ReportExecStatus RPC to the coordinator to report the
-/// execution status, the current state of the execution, and the instance profile. The
-/// frequency of those reports is controlled by the flag status_report_interval_ms;
-/// Setting that flag to 0 disables periodic reporting altogether. Regardless of the value
-/// of that flag, a report is sent at least once at the end of execution with an overall
-/// status and profile (and 'done' indicator).
-/// The FIS will send at least one final status report. If execution ended with an error,
-/// that error status will be part of the final report (it will not be overridden by
-/// the resulting cancellation).
 ///
 /// This class is thread-safe.
 /// All non-getter public functions other than Exec() block until the Prepare phase
@@ -88,7 +79,7 @@ class FragmentInstanceState {
   /// Must only be called once.
   Status Exec() WARN_UNUSED_RESULT;
 
-  /// Cancels execution and sends a final status report. Idempotent.
+  /// Cancels execution. Idempotent.
   void Cancel();
 
   /// Blocks until the Prepare phase of Exec() is finished and the exec tree is
@@ -99,6 +90,12 @@ class FragmentInstanceState {
   /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
   void PublishFilter(const TPublishFilterParams& params);
 
+  /// Called periodically by query state thread to get the current status of this fragment
+  /// instance. The fragment instance's status is stored in 'instance_status' and its
+  /// Thrift runtime profile is stored in 'thrift_profile'.
+  void GetStatusReport(FragmentInstanceExecStatusPB* instance_status,
+      TRuntimeProfileTree* thrift_profile);
+
   /// Returns fragment instance's sink if this is the root fragment instance. Valid after
   /// the Prepare phase. May be nullptr.
   PlanRootSink* root_sink() { return root_sink_; }
@@ -119,14 +116,11 @@ class FragmentInstanceState {
   const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
   FInstanceExecStatePB current_state() const { return current_state_.Load(); }
+  bool final_report_sent() const { return final_report_sent_; }
   const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
+  bool IsDone() const { return current_state_.Load() == FInstanceExecStatePB::FINISHED; }
   ObjectPool* obj_pool();
 
-  /// Returns the monotonically increasing sequence number. Called by status report thread
-  /// only except for the final report which is handled by finstance exec thread after the
-  /// reporting thread has exited.
-  int64_t AdvanceReportSeqNo() { return ++report_seq_no_; }
-
   /// Returns true if the current thread is a thread executing the whole or part of
   /// a fragment instance.
   static bool IsFragmentExecThread() {
@@ -152,24 +146,6 @@ class FragmentInstanceState {
   ExecNode* exec_tree_ = nullptr; // lives in obj_pool()
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
 
-  /// profile reporting-related
-  std::unique_ptr<Thread> report_thread_;
-  boost::mutex report_thread_lock_;
-
-  /// Indicates that profile reporting thread should stop.
-  /// Tied to report_thread_lock_.
-  ConditionVariable stop_report_thread_cv_;
-
-  /// Indicates that profile reporting thread started.
-  /// Tied to report_thread_lock_.
-  ConditionVariable report_thread_started_cv_;
-
-  /// When the report thread starts, it sets report_thread_active_ to true and signals
-  /// report_thread_started_cv_. The report thread is shut down by setting
-  /// report_thread_active_ to false and signalling stop_report_thread_cv_. Protected
-  /// by report_thread_lock_.
-  bool report_thread_active_ = false;
-
   /// A 'fake mutex' to detect any race condition in accessing 'report_seq_no_' below.
   /// There should be only one thread doing status report at the same time.
   DFAKE_MUTEX(report_status_lock_);
@@ -178,6 +154,10 @@ class FragmentInstanceState {
   /// duplicated or out-of-order reports.
   int64_t report_seq_no_ = 0;
 
+  /// True iff the final report has already been sent. Read exclusively by the query
+  /// state thread only. Written in GetStatusReport() by the query state thread.
+  bool final_report_sent_ = false;
+
   /// Profile for timings for each stage of the plan fragment instance's lifecycle.
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
@@ -230,6 +210,10 @@ class FragmentInstanceState {
   /// Set when OpenInternal() returns.
   Promise<Status> opened_promise_;
 
+  /// Returns the monotonically increasing sequence number.
+  /// Called by query state thread only.
+  int64_t AdvanceReportSeqNo() { return ++report_seq_no_; }
+
   /// A counter for the per query, per host peak mem usage. Note that this is not the
   /// max of the peak memory of all fragments running on a host since it needs to take
   /// into account when they are running concurrently. All fragments for a single query
@@ -275,41 +259,23 @@ class FragmentInstanceState {
   Status ExecInternal() WARN_UNUSED_RESULT;
 
   /// Closes the underlying fragment instance and frees up all resources allocated in
-  /// Prepare() and Open(). Assumes the report thread is stopped. Can handle
-  /// partially-finished Prepare().
+  /// Prepare() and Open(). Can handle partially-finished Prepare().
   void Close();
 
-  /// Main loop of profile reporting thread.
-  /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to
-  /// false. This will not send the final report.
-  void ReportProfileThread();
-
-  /// Invoked the report callback. If 'done' is true, sends the final report with
-  /// 'status' and the profile. This type of report is sent once and only by the
-  /// instance execution thread. Otherwise, a profile-only report is sent, which the
-  /// ReportProfileThread() thread will do periodically. It's expected that only one
-  /// of instance execution thread or ReportProfileThread() should be calling this
-  /// function at a time.
-  void SendReport(bool done, const Status& status);
-
   /// Handle the execution event 'event'. This implements a state machine and will update
   /// the current execution state of this fragment instance. Also marks an event in
   /// 'event_sequence_' for some states. Must not be called by multiple threads
   /// concurrently.
   void UpdateState(const StateEvent event);
 
-  /// Called when execution is complete to finalize counters and send the final status
-  /// report.  Must be called only once. Can handle partially-finished Prepare().
+  /// Called when execution is complete to finalize counters. Must be called only once.
+  /// Can handle partially-finished Prepare().
   void Finalize(const Status& status);
 
   /// Releases the thread token for this fragment executor. Can handle
   /// partially-finished Prepare().
   void ReleaseThreadToken();
 
-  /// Stops report thread, if one is running. Blocks until report thread terminates.
-  /// Idempotent.
-  void StopReportThread();
-
   /// Print stats about scan ranges for each volumeId in params to info log.
   void PrintVolumeIds();
 };

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 1eca80e..ebd17cd 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -59,8 +59,8 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   // query startup (which takes ownership of the QueryState reference)
   unique_ptr<Thread> t;
   status = Thread::Create("query-exec-mgr",
-      Substitute("start-query-finstances-$0", PrintId(query_id)),
-          &QueryExecMgr::StartQueryHelper, this, qs, &t, true);
+      Substitute("query-state-$0", PrintId(query_id)),
+          &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
   if (!status.ok()) {
     // decrement refcount taken in QueryState::Init()
     qs->ReleaseBackendResourceRefcount();
@@ -125,8 +125,9 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
 }
 
 
-void QueryExecMgr::StartQueryHelper(QueryState* qs) {
-  qs->StartFInstances();
+void QueryExecMgr::ExecuteQueryHelper(QueryState* qs) {
+  // Start the query fragment instances and wait for completion or errors.
+  if (LIKELY(qs->StartFInstances())) qs->MonitorFInstances();
 
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // tcmalloc and address or thread sanitizer cannot be used together

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 262ef59..76c0ed5 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -75,7 +75,9 @@ class QueryExecMgr : public CacheLineAligned {
       const TQueryCtx& query_ctx, int64_t mem_limit, bool* created);
 
   /// Execute instances and decrement refcount (acquire ownership of qs).
-  void StartQueryHelper(QueryState* qs);
+  /// Return only after all fragments complete unless an instances hit
+  /// an error or the query is cancelled.
+  void ExecuteQueryHelper(QueryState* qs);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index c6b8440..12e8cf2 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -53,6 +53,8 @@ using kudu::rpc::RpcSidecar;
 
 #include "common/names.h"
 
+DEFINE_int32(status_report_interval_ms, 5000,
+    "interval between profile reports; in milliseconds");
 DEFINE_int32(report_status_retry_interval_ms, 100,
     "The interval in milliseconds to wait before retrying a failed status report RPC to "
     "the coordinator.");
@@ -220,33 +222,30 @@ const char* QueryState::BackendExecStateToString(const BackendExecState& state)
   return exec_state_to_str.at(state);
 }
 
-inline bool QueryState::IsTerminalState(const BackendExecState& state) {
-  return state == BackendExecState::FINISHED
-      || state == BackendExecState::CANCELLED
-      || state == BackendExecState::ERROR;
-}
-
-Status QueryState::UpdateBackendExecState() {
-  BackendExecState old_state = backend_exec_state_;
-
-  unique_lock<SpinLock> l(status_lock_);
-  // We shouldn't call this function if we're already in a terminal state.
-  DCHECK(!IsTerminalState(backend_exec_state_))
-      << " Current State: " << BackendExecStateToString(backend_exec_state_)
-      << " | Current Status: " << query_status_.GetDetail();
-
-  if (query_status_.IsCancelled()) {
-    // Received cancellation - go to CANCELLED state.
-    backend_exec_state_ = BackendExecState::CANCELLED;
-  } else if (!query_status_.ok()) {
-    // Error while executing - go to ERROR state.
-    backend_exec_state_ = BackendExecState::ERROR;
-  } else {
-    // Transition to the next state in the lifecycle.
-    backend_exec_state_ = old_state == BackendExecState::PREPARING ?
-        BackendExecState::EXECUTING : BackendExecState::FINISHED;
+void QueryState::UpdateBackendExecState() {
+  DFAKE_SCOPED_LOCK(backend_exec_state_lock_);
+  {
+    BackendExecState cur_state = backend_exec_state_;
+    unique_lock<SpinLock> l(status_lock_);
+    // We shouldn't call this function if we're already in a terminal state.
+    DCHECK(cur_state == BackendExecState::PREPARING ||
+        cur_state == BackendExecState::EXECUTING)
+            << " Current State: " << BackendExecStateToString(cur_state)
+            << " | Current Status: " << overall_status_.GetDetail();
+    if (overall_status_.IsCancelled()) {
+      // Received cancellation - go to CANCELLED state.
+      backend_exec_state_ = BackendExecState::CANCELLED;
+    } else if (!overall_status_.ok()) {
+      // Error while executing - go to ERROR state.
+      backend_exec_state_ = BackendExecState::ERROR;
+    } else {
+      // Transition to the next state in the lifecycle.
+      backend_exec_state_ = cur_state == BackendExecState::PREPARING ?
+          BackendExecState::EXECUTING : BackendExecState::FINISHED;
+    }
   }
-  return query_status_;
+  // Send one last report if the query has reached the terminal state.
+  if (IsTerminalState()) ReportExecStatus();
 }
 
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
@@ -256,82 +255,66 @@ FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_i
   return it != fis_map_.end() ? it->second : nullptr;
 }
 
-void QueryState::ReportExecStatus(bool done, const Status& status,
-    FragmentInstanceState* fis) {
-  ReportExecStatusAux(done, status, fis, true);
-}
-
-void QueryState::ConstructReport(bool done, const Status& status,
-    FragmentInstanceState* fis, ReportExecStatusRequestPB* report,
-    ThriftSerializer* serializer, uint8_t** profile_buf, uint32_t* profile_len) {
+void QueryState::ConstructReport(bool instances_started,
+    ReportExecStatusRequestPB* report, TRuntimeProfileForest* profiles_forest) {
   report->Clear();
   TUniqueIdToUniqueIdPB(query_id(), report->mutable_query_id());
   DCHECK(exec_rpc_params().__isset.coord_state_idx);
   report->set_coord_state_idx(exec_rpc_params().coord_state_idx);
-  status.ToProto(report->mutable_status());
-
-  if (fis != nullptr) {
-    // create status for 'fis'
-    FragmentInstanceExecStatusPB* instance_status = report->add_instance_exec_status();
-    instance_status->set_report_seq_no(fis->AdvanceReportSeqNo());
-    const TUniqueId& finstance_id = fis->instance_id();
-    TUniqueIdToUniqueIdPB(finstance_id, instance_status->mutable_fragment_instance_id());
-    status.ToProto(instance_status->mutable_status());
-    instance_status->set_done(done);
-    instance_status->set_current_state(fis->current_state());
-
-    // Only send updates to insert status if fragment is finished, the coordinator waits
-    // until query execution is done to use them anyhow.
-    RuntimeState* state = fis->runtime_state();
-    if (done) {
-      state->dml_exec_state()->ToProto(instance_status->mutable_dml_exec_status());
+  {
+    std::unique_lock<SpinLock> l(status_lock_);
+    overall_status_.ToProto(report->mutable_overall_status());
+    if (IsValidFInstanceId(failed_finstance_id_)) {
+      TUniqueIdToUniqueIdPB(failed_finstance_id_, report->mutable_fragment_instance_id());
     }
+  }
 
-    // Send new errors to coordinator
-    state->GetUnreportedErrors(instance_status->mutable_error_log());
+  if (instances_started) {
+    for (const auto& entry : fis_map_) {
+      FragmentInstanceState* fis = entry.second;
 
-    // Debug action to simulate failure to serialize the profile.
-    if (!DebugAction(query_options(), "REPORT_EXEC_STATUS_PROFILE").ok()) {
-      DCHECK(profile_buf == nullptr);
-      return;
-    }
+      // If this fragment instance has already sent its last report, skip it.
+      if (fis->final_report_sent()) {
+        DCHECK(fis->IsDone());
+        continue;
+      }
 
-    // Generate the runtime profile.
-    DCHECK(fis->profile() != nullptr);
-    TRuntimeProfileTree thrift_profile;
-    fis->profile()->ToThrift(&thrift_profile);
-    Status serialize_status =
-        serializer->SerializeToBuffer(&thrift_profile, profile_len, profile_buf);
-    if (UNLIKELY(!serialize_status.ok() ||
-            *profile_len > FLAGS_rpc_max_message_size)) {
-      profile_buf = nullptr;
-      LOG(ERROR) << Substitute("Failed to create $0profile for query fragment $1: "
-          "status=$2 len=$3", done ? "final " : "", PrintId(finstance_id),
-          serialize_status.ok() ? "OK" : serialize_status.GetDetail(), *profile_len);
+      // Update the status and profiles of this fragment instance.
+      FragmentInstanceExecStatusPB* instance_status = report->add_instance_exec_status();
+      profiles_forest->profile_trees.emplace_back();
+      fis->GetStatusReport(instance_status, &profiles_forest->profile_trees.back());
     }
   }
 }
 
-// TODO: rethink whether 'done' can be inferred from 'status' or 'query_status_'.
-void QueryState::ReportExecStatusAux(bool done, const Status& status,
-    FragmentInstanceState* fis, bool instances_started) {
-  // if we're reporting an error, we're done
-  DCHECK(status.ok() || done);
-  // if this is not for a specific fragment instance, we're reporting an error
-  DCHECK(fis != nullptr || !status.ok());
+void QueryState::ReportExecStatus() {
+  bool instances_started = fis_map_.size() > 0;
 
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
   // be waiting for a final report and profile.
   ReportExecStatusRequestPB report;
 
+  // Gather the statuses and profiles of the fragment instances.
+  TRuntimeProfileForest profiles_forest;
+  ConstructReport(instances_started, &report, &profiles_forest);
+
   // Serialize the runtime profile with Thrift to 'profile_buf'. Note that the
   // serialization output is owned by 'serializer' so this must be alive until RPC
   // is done.
   ThriftSerializer serializer(true);
   uint8_t* profile_buf = nullptr;
   uint32_t profile_len = 0;
-  ConstructReport(done, status, fis, &report, &serializer, &profile_buf, &profile_len);
+  Status serialize_status =
+      serializer.SerializeToBuffer(&profiles_forest, &profile_len, &profile_buf);
+  if (UNLIKELY(!serialize_status.ok() ||
+          profile_len > FLAGS_rpc_max_message_size ||
+          !DebugAction(query_options(), "REPORT_EXEC_STATUS_PROFILE").ok())) {
+    profile_buf = nullptr;
+    LOG(ERROR) << Substitute("Failed to create $0profile for query $1: "
+        "status=$2 len=$3", IsTerminalState() ? "final " : "", PrintId(query_id()),
+        serialize_status.ok() ? "OK" : serialize_status.GetDetail(), profile_len);
+  }
 
   // Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
   // It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
@@ -401,47 +384,63 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   }
 }
 
+void QueryState::ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
+  {
+    std::unique_lock<SpinLock> l(status_lock_);
+    if (!HasErrorStatus()) {
+      overall_status_ = status;
+      failed_finstance_id_ = finst_id;
+    }
+  }
+  discard_result(instances_prepared_barrier_->Notify());
+}
+
+void QueryState::ErrorDuringExecute(const Status& status, const TUniqueId& finst_id) {
+  {
+    std::unique_lock<SpinLock> l(status_lock_);
+    if (!HasErrorStatus()) {
+      overall_status_ = status;
+      failed_finstance_id_ = finst_id;
+    }
+  }
+  instances_finished_barrier_->NotifyRemaining();
+}
+
 Status QueryState::WaitForPrepare() {
   instances_prepared_barrier_->Wait();
-
   unique_lock<SpinLock> l(status_lock_);
-  return query_status_;
+  return overall_status_;
 }
 
-Status QueryState::WaitForFinish() {
+void QueryState::WaitForFinish() {
   instances_finished_barrier_->Wait();
+}
 
-  unique_lock<SpinLock> l(status_lock_);
-  return query_status_;
+bool QueryState::WaitForFinishOrTimeout(int32_t timeout_ms) {
+  bool timed_out = false;
+  instances_finished_barrier_->Wait(timeout_ms, &timed_out);
+  return !timed_out;
 }
 
-void QueryState::StartFInstances() {
+bool QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
           << " #instances=" << exec_rpc_params_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
   DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
+  DCHECK_GT(exec_rpc_params_.fragment_ctxs.size(), 0);
+  TPlanFragmentCtx* fragment_ctx = &exec_rpc_params_.fragment_ctxs[0];
+  int num_unstarted_instances = exec_rpc_params_.fragment_instance_ctxs.size();
+  int fragment_ctx_idx = 0;
+
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl);
-  Status status = DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_);
-  if (!status.ok()) {
-    ErrorDuringPrepare(status, TUniqueId());
-    Status updated_query_status = UpdateBackendExecState();
-    instances_prepared_barrier_->NotifyRemaining();
-    DCHECK(!updated_query_status.ok());
-    // TODO (IMPALA-4063): This call to ReportExecStatusAux() should internally be handled
-    // by UpdateBackendExecState().
-    ReportExecStatusAux(true, status, nullptr, false);
-    return;
-  }
+  Status start_finstances_status =
+      DescriptorTbl::Create(&obj_pool_, query_ctx().desc_tbl, &desc_tbl_);
+  if (UNLIKELY(!start_finstances_status.ok())) goto error;
   VLOG(2) << "descriptor table for query=" << PrintId(query_id())
           << "\n" << desc_tbl_->DebugString();
 
-  Status thread_create_status;
-  DCHECK_GT(exec_rpc_params_.fragment_ctxs.size(), 0);
-  TPlanFragmentCtx* fragment_ctx = &exec_rpc_params_.fragment_ctxs[0];
-  int fragment_ctx_idx = 0;
-  int num_unstarted_instances = exec_rpc_params_.fragment_instance_ctxs.size();
   fragment_events_start_time_ = MonotonicStopWatch::Now();
   for (const TPlanFragmentInstanceCtx& instance_ctx :
            exec_rpc_params_.fragment_instance_ctxs) {
@@ -476,11 +475,10 @@ void QueryState::StartFInstances() {
 
     // Inject thread creation failures through debug actions if enabled.
     Status debug_action_status = DebugAction(query_options(), "FIS_FAIL_THREAD_CREATION");
-    thread_create_status = debug_action_status.ok() ?
+    start_finstances_status = !debug_action_status.ok() ? debug_action_status :
         Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-            [this, fis]() { this->ExecFInstance(fis); }, &t, true) :
-        debug_action_status;
-    if (!thread_create_status.ok()) {
+            [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+    if (!start_finstances_status.ok()) {
       fis_map_.erase(fis->instance_id());
       fis_list.pop_back();
       // Undo refcnt increments done immediately prior to Thread::Create(). The
@@ -488,44 +486,64 @@ void QueryState::StartFInstances() {
       // neither of these decrements will free any structures.
       ReleaseBackendResourceRefcount();
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
-      break;
+      goto error;
     }
     t->Detach();
     --num_unstarted_instances;
   }
-
-  if (!thread_create_status.ok()) {
-    // We failed to start 'num_unstarted_instances', so make sure to notify
-    // 'instances_prepared_barrier_' 'num_unstarted_instances - 1' times, to unblock
-    // WaitForPrepare(). The last remaining notification will be set by the call to
-    // ErrorDuringPrepare() below.
-    while (num_unstarted_instances > 1) {
-      DonePreparing();
-      --num_unstarted_instances;
-    }
-
-    // We prioritize thread creation failure as a query killing error, even over an error
-    // during Prepare() for a FIS.
-    // We have to notify anyone waiting on WaitForPrepare() that this query has failed.
-    ErrorDuringPrepare(thread_create_status, TUniqueId());
-    Status updated_query_status = UpdateBackendExecState();
-    DCHECK(!updated_query_status.ok());
-    // Block until all the already started fragment instances finish Prepare()-ing to
-    // to report an error.
-    discard_result(WaitForPrepare());
-    ReportExecStatusAux(true, thread_create_status, nullptr, true);
-    return;
+  return true;
+
+error:
+  // This point is reached if there were general errors to start query fragment instances.
+  // Wait for all running fragment instances to finish preparing and report status to the
+  // coordinator to start query cancellation.
+  {
+    // Prioritize general errors as a query killing error, even over an error
+    // during Prepare() for a FIS. Overwrite any existing value in 'overall_status_'.
+    std::unique_lock<SpinLock> l(status_lock_);
+    overall_status_ = start_finstances_status;
+    failed_finstance_id_ = TUniqueId();
+  }
+  // Updates the barrier for all unstarted fragment instances.
+  for (int i = 0; i < num_unstarted_instances; ++i) {
+    DonePreparing();
   }
+  // Block until all the already started fragment instances finish Prepare()-ing before
+  // reporting the error.
+  discard_result(WaitForPrepare());
+  UpdateBackendExecState();
+  DCHECK(IsTerminalState());
+  return false;
+}
 
+void QueryState::MonitorFInstances() {
+  // Wait for all fragment instances to finish preparing.
   discard_result(WaitForPrepare());
-  if (!UpdateBackendExecState().ok()) return;
+  UpdateBackendExecState();
+  if (IsTerminalState()) goto done;
+
+  // Once all fragment instances finished preparing successfully, start periodic
+  // reporting back to the coordinator.
   DCHECK(backend_exec_state_ == BackendExecState::EXECUTING)
       << BackendExecStateToString(backend_exec_state_);
+  if (FLAGS_status_report_interval_ms > 0) {
+    while (!WaitForFinishOrTimeout(FLAGS_status_report_interval_ms)) {
+      ReportExecStatus();
+    }
+  } else {
+    WaitForFinish();
+  }
+  UpdateBackendExecState();
+  DCHECK(IsTerminalState());
 
-  discard_result(WaitForFinish());
-  if (!UpdateBackendExecState().ok()) return;
-  DCHECK(backend_exec_state_ == BackendExecState::FINISHED)
-      << BackendExecStateToString(backend_exec_state_);
+done:
+  if (backend_exec_state_ == BackendExecState::FINISHED) {
+    for (const auto& entry : fis_map_) {
+      DCHECK(entry.second->IsDone());
+    }
+  } else {
+    DCHECK_EQ(is_cancelled_.Load(), 1);
+  }
 }
 
 void QueryState::AcquireBackendResourceRefcount() {
@@ -556,8 +574,12 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
       << " #in-flight="
       << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->GetValue()
       << " status=" << status;
-  // initiate cancellation if nobody has done so yet
-  if (!status.ok()) Cancel();
+
+  // Don't cancel other fragments here as the final report for "fis" may not have been
+  // sent yet. Cancellation will happen in ReportExecStatus() after sending the final
+  // report to the coordinator. Otherwise, the coordinator fragment may mark the status
+  // of this backend as "CANCELLED", masking the original error.
+
   // decrement refcount taken in StartFInstances()
   ReleaseBackendResourceRefcount();
   // decrement refcount taken in StartFInstances()

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index da984b2..6453d3e 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,6 +27,7 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gutil/threading/thread_collision_warner.h" // for DFAKE_*
 #include "runtime/tmp-file-mgr.h"
 #include "util/counting-barrier.h"
 #include "util/uid-util.h"
@@ -63,32 +64,38 @@ class ThriftSerializer;
 /// are also managed via a separate resource reference count, which should be released as
 /// soon as the resources are not needed to free resources promptly.
 ///
-/// When any fragment instance execution returns with an error status, all
-/// fragment instances are automatically cancelled.
+/// We maintain a state denoted by BackendExecState. The initial state is PREPARING.
+/// Once all query fragment instances have finished FIS::Prepare(), the BackendExecState
+/// will transition to:
+/// - EXECUTING if all fragment instances succeeded in Prepare()
+/// - ERROR if any fragment instances failed during or after Prepare()
+/// - CANCELLED if the query is cancelled
 ///
-/// We maintain a state denoted by BackendExecState. We transition from one non-error
-/// state to the next only if *all* underlying fragment instances have done so.
-/// Eg: We transition from the PREPARING state to the EXECUTING state only if *all* the
-/// underlying fragment instances have finished Prepare().
-/// However, the behavior for transitioning from a non-error state to an error state is
-/// different for different states. If any fragment instance hits an error or cancellation
-/// during the EXECUTING state, then we immediately change the state of the query to the
-/// ERROR or CANCELLED state accordingly.
-/// However, if a fragment instance hits an error during Prepare(), we still wait for
-/// *all* fragment instances to complete preparing before transitioning to the ERROR
-/// state. This is to simplify the query lifecycle so that Prepare() is always completed
-/// before it can handle either a Cancel() RPC or a PublishFilter() RPC.
+/// Please note that even if some fragment instances hit an error during or after
+/// Prepare(), the state transition from PREPARING won't happen until all fragment
+/// instances have finished Prepare(). This makes sure the query state is initialized
+/// to handle either a Cancel() RPC or a PublishFilter() RPC after PREPARING state.
 ///
-/// Status reporting: all instances currently report their status independently.
-/// Each instance sends at least one final status report with its overall execution
-/// status, so if any of the instances encountered an error, that error will be reported.
+/// Once BackendExecState() enters EXECUTING state, any error will trigger the
+/// BackendExecState to go into ERROR state and the query execution is considered over
+/// on this backend.
+///
+/// When any fragment instance execution returns with an error status, all fragment
+/// instances are automatically cancelled. The query state thread (started by
+/// QueryExecMgr) periodically reports the overall status, the current state of execution
+/// and the profile of each fragment instance to the coordinator. The frequency of those
+/// reports is controlled by the flag status_report_interval_ms; Setting it to 0 disables
+/// periodic reporting altogether. Regardless of the value of that flag, a report is sent
+/// at least once at the end of execution with an overall status and profile (and 'done'
+/// indicator). If execution ended with an error, that error status will be part of
+/// the final report (it will not be overridden by the resulting cancellation).
 ///
 /// Thread-safe, unless noted otherwise.
 ///
 /// TODO:
 /// - set up kudu clients in Init(), remove related locking
-/// - when ReportExecStatus() encounters an error, query execution at this node
-///   gets aborted, but it's possible for the coordinator not to find out about that;
+/// - IMPALA-2990: when ReportExecStatus() encounters an error, query execution at this
+///   node gets aborted, but it's possible for the coordinator not to find out about that;
 ///   fix the coordinator to periodically ping the backends (should the coordinator
 ///   simply poll for the status reports?)
 class QueryState {
@@ -161,7 +168,7 @@ class QueryState {
   /// Sets up state required for fragment execution: memory reservations, etc. Fails if
   /// resources could not be acquired. Acquires a backend resource refcount and returns
   /// it to the caller on both success and failure. The caller must release it by
-  /// calling ReleaseExecResourceRefcount().
+  /// calling ReleaseBackendResourceRefcount().
   ///
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
@@ -169,9 +176,16 @@ class QueryState {
 
   /// Performs the runtime-intensive parts of initial setup and starts all fragment
   /// instances belonging to this query. Each instance receives its own execution
-  /// thread. Blocks until a terminal state has been reached.
-  /// Not idempotent, not thread-safe. Must only be called by the QueryState thread.
-  void StartFInstances();
+  /// thread. Not idempotent, not thread-safe. Must only be called by the query state
+  /// thread. Returns true iff all fragment instance threads were started successfully.
+  /// Returns false otherwise.
+  bool StartFInstances();
+
+  /// Monitors the execution of all underlying fragment instances and updates the query
+  /// state accordingly. This is also responsible for sending status reports periodically
+  /// to the coordinator. Not idempotent, not thread-safe. Must only be called by the
+  /// query state thread.
+  void MonitorFInstances();
 
   /// Blocks until all fragment instances have finished their Prepare phase.
   FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
@@ -194,14 +208,6 @@ class QueryState {
   /// execution resources.
   void ReleaseBackendResourceRefcount();
 
-  /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
-  /// status must be an error. If fis is given, the content will depend on whether
-  /// the fis has finished its Prepare phase. It sends a report for the instance,
-  /// and it will include the profile if the fis is prepared. If the fis is not
-  /// prepared, the status must be an error.
-  /// If there is an error during the rpc, initiates cancellation.
-  void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
-
   /// Checks whether spilling is enabled for this query. Must be called before the first
   /// call to BufferPool::Unpin() for the query. Returns OK if spilling is enabled. If
   /// spilling is not enabled, logs a MEM_LIMIT_EXCEEDED error from
@@ -225,32 +231,12 @@ class QueryState {
   /// Updates the query status and the failed instance ID if it's not set already.
   /// Also notifies anyone waiting on WaitForPrepare() if this is called by the last
   /// fragment instance to complete Prepare().
-  void ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
-    // Do a racy check to avoid getting the lock if an error is already set.
-    if (query_status_.ok()) {
-      std::unique_lock<SpinLock> l(status_lock_);
-      if (query_status_.ok()) {
-        query_status_ = status;
-        failed_finstance_id_ = finst_id;
-      }
-    }
-    discard_result(instances_prepared_barrier_->Notify());
-  }
+  void ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id);
 
   /// Called by a fragment instance thread to notify that it hit an error during Execute()
   /// Updates the query status and records the failed instance ID if they're not set
-  /// already. Also notifies anyone waiting on WaitForFinish().
-  void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id) {
-    // Do a racy check to avoid getting the lock if an error is already set.
-    if (query_status_.ok()) {
-      std::unique_lock<SpinLock> l(status_lock_);
-      if (query_status_.ok()) {
-        query_status_ = status;
-        failed_finstance_id_ = finst_id;
-      }
-    }
-    instances_finished_barrier_->NotifyRemaining();
-  }
+  /// already. Also notifies anyone waiting on WaitForFinishOrTimeout().
+  void ErrorDuringExecute(const Status& status, const TUniqueId& finst_id);
 
  private:
   friend class QueryExecMgr;
@@ -261,11 +247,15 @@ class QueryState {
 
   static const int DEFAULT_BATCH_SIZE = 1024;
 
-  /// Return overall status of all fragment instances during execution. A failure
-  /// in any instance's execution (after Prepare()) will cause this function
-  /// to return an error status. Blocks until all fragment instances have finished
-  /// executing or until one of them hits an error.
-  Status WaitForFinish();
+  /// Blocks until all fragment instances have finished executing or until one of them
+  /// hits an error, or until 'timeout_ms' milliseconds has elapsed. Returns 'true' if
+  /// all fragment instances finished or one of them hits an error. Return 'false' on
+  /// time out.
+  bool WaitForFinishOrTimeout(int32_t timeout_ms);
+
+  /// Blocks until all fragment instances have finished executing or until one of them
+  /// hits an error.
+  void WaitForFinish();
 
   /// States that a query goes through during its lifecycle.
   enum class BackendExecState {
@@ -286,30 +276,28 @@ class QueryState {
     ERROR
   };
 
+  /// Pseudo-lock to verify only query state thread is updating 'backend_exec_state_'.
+  DFAKE_MUTEX(backend_exec_state_lock_);
+
   /// Current state of this query in this executor.
-  /// Thread-safety: Only updated by the QueryState thread.
+  /// Thread-safety: Only updated by the query state thread.
   BackendExecState backend_exec_state_ = BackendExecState::PREPARING;
 
-  /// Updates the BackendExecState based on 'query_status_'. A state transition happens
-  /// if the current state is a non-terminal state; the transition can either be to the
-  /// next legal state or ERROR if 'query_status_' is an error. Thread safe. This is a
-  /// helper function to StartFInstances() which executes on the QueryState thread.
-  Status UpdateBackendExecState();
-
-  /// A string representation of 'state'.
-  const char* BackendExecStateToString(const BackendExecState& state);
-
-  /// Returns 'true' if 'state' is a terminal state (FINISHED, CANCELLED, ERROR).
-  inline bool IsTerminalState(const BackendExecState& state);
-
-  /// Protects 'query_status_' and 'failed_finstance_id_'.
+  /// Protects 'overall_status_' and 'failed_finstance_id_'.
   SpinLock status_lock_;
 
   /// The overall status of this QueryState.
+  /// A backend can have an error from a specific fragment instance, or it can have a
+  /// general error that is independent of any individual fragment. If reporting a
+  /// single error, this status is always set to the error being reported. If reporting
+  /// multiple errors, the status is set by the following rules:
+  /// 1. A general error takes precedence over any fragment instance error.
+  /// 2. Any fragment instance error takes precedence over any cancelled status.
+  /// 3. If multiple fragments have errors, the first fragment to hit an error is given
+  ///    preference.
   /// Status::OK if all the fragment instances managed by this QS are also Status::OK;
-  /// Otherwise, it will reflect the first non-OK status of a FIS.
   /// Protected by 'status_lock_'.
-  Status query_status_;
+  Status overall_status_;
 
   /// ID of first fragment instance to hit an error.
   /// Protected by 'status_lock_'.
@@ -361,9 +349,8 @@ class QueryState {
 
   /// Barrier for the completion of all the fragment instances.
   /// If the 'Status' is not OK due to an error during fragment instance execution, this
-  /// barrier is unblocked immediately.
-  /// 'query_status_' will be set once this is unblocked and so will 'failed_instance_id_'
-  /// if an error is hit.
+  /// barrier is unblocked immediately. 'overall_status_' is set once this is unblocked
+  /// and so is 'failed_instance_id_' if an error is hit.
   std::unique_ptr<CountingBarrier> instances_finished_barrier_;
 
   /// map from instance id to its state (owned by obj_pool_), populated in
@@ -416,16 +403,40 @@ class QueryState {
   void ReleaseBackendResources();
 
   /// Helper for ReportExecStatus() to construct a status report to be sent to the
-  /// coordinator. If 'fis' is not NULL, the runtime profile is serialized by the Thrift
-  /// serializer 'serializer' and stored in 'profile_buf'.
-  void ConstructReport(bool done, const Status& status,
-      FragmentInstanceState* fis, ReportExecStatusRequestPB* report,
-      ThriftSerializer* serializer, uint8_t** profile_buf, uint32_t* len);
-
-  /// Same behavior as ReportExecStatus().
-  /// Cancel on error only if instances_started is true.
-  void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,
-      bool instances_started);
+  /// coordinator. The execution statuses (e.g. 'done' indicator) of all fragment
+  /// instances belonging to this query state are stored in 'report'. The Thrift
+  /// serialized runtime profiles of fragment instances are stored in 'profiles_forest'.
+  void ConstructReport(bool instances_started, ReportExecStatusRequestPB* report,
+      TRuntimeProfileForest* profiles_forest);
+
+  /// Gather statues and profiles of all fragment instances belonging to this query state
+  /// and send it to the coordinator via ReportExecStatus() RPC.
+  void ReportExecStatus();
+
+  /// Returns true if the overall backend status is already set with an error.
+  bool HasErrorStatus() const {
+    return !overall_status_.ok() && !overall_status_.IsCancelled();
+  }
+
+  /// Returns true if the query has reached a terminal state.
+  bool IsTerminalState() const {
+    return backend_exec_state_ == BackendExecState::FINISHED
+        || backend_exec_state_ == BackendExecState::CANCELLED
+        || backend_exec_state_ == BackendExecState::ERROR;
+  }
+
+  /// Updates the BackendExecState based on 'overall_status_'. Should only be called when
+  /// the current state is a non-terminal state. The transition can either be to the next
+  /// legal state or ERROR if 'overall_status_' is an error. Called by the query state
+  /// thread only. It acquires the 'status_lock_' to synchronize with the fragment
+  /// instance threads' updates to 'overall_status_'.
+  ///
+  /// Upon reaching a terminal state, it will call ReportExecStatus() to send the final
+  /// report to the coordinator and not expect to be called afterwards.
+  void UpdateBackendExecState();
+
+  /// A string representation of 'state'.
+  const char* BackendExecStateToString(const BackendExecState& state);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index e80e3ae..5f634fe 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1249,9 +1249,10 @@ beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
 // to call concurrently with Coordinator::Exec(). See comments for 'coord_' and
 // 'coord_exec_called_' for more details.
 Status ClientRequestState::UpdateBackendExecStatus(
-    const ReportExecStatusRequestPB& request, const TRuntimeProfileTree& thrift_profile) {
+    const ReportExecStatusRequestPB& request,
+    const TRuntimeProfileForest& thrift_profiles) {
   DCHECK(coord_.get());
-  return coord_->UpdateBackendExecStatus(request, thrift_profile);
+  return coord_->UpdateBackendExecStatus(request, thrift_profiles);
 }
 
 void ClientRequestState::UpdateFilter(const TUpdateFilterParams& params) {

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 5bb1839..2a2338e 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -161,7 +161,7 @@ class ClientRequestState {
   /// methods should be used instead of calling them directly using the coordinator
   /// object.
   Status UpdateBackendExecStatus(const ReportExecStatusRequestPB& request,
-      const TRuntimeProfileTree& thrift_profile) WARN_UNUSED_RESULT;
+      const TRuntimeProfileForest& thrift_profiles) WARN_UNUSED_RESULT;
   void UpdateFilter(const TUpdateFilterParams& params);
 
   ImpalaServer::SessionState* session() const { return session_.get(); }

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/service/control-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.cc b/be/src/service/control-service.cc
index ca2564b..c704565 100644
--- a/be/src/service/control-service.cc
+++ b/be/src/service/control-service.cc
@@ -90,17 +90,17 @@ bool ControlService::Authorize(const google::protobuf::Message* req,
 
 Status ControlService::GetProfile(const ReportExecStatusRequestPB& request,
     const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
-    TRuntimeProfileTree* thrift_profile) {
+    TRuntimeProfileForest* thrift_profiles) {
   // Debug action to simulate deserialization failure.
   RETURN_IF_ERROR(DebugAction(request_state.query_options(),
       "REPORT_EXEC_STATUS_PROFILE"));
-  kudu::Slice thrift_profile_slice;
+  kudu::Slice thrift_profiles_slice;
   KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar(
-      request.thrift_profiles_sidecar_idx(), &thrift_profile_slice),
+      request.thrift_profiles_sidecar_idx(), &thrift_profiles_slice),
       "Failed to get thrift profile sidecar");
-  uint32_t len = thrift_profile_slice.size();
-  RETURN_IF_ERROR(DeserializeThriftMsg(thrift_profile_slice.data(),
-      &len, true, thrift_profile));
+  uint32_t len = thrift_profiles_slice.size();
+  RETURN_IF_ERROR(DeserializeThriftMsg(thrift_profiles_slice.data(),
+      &len, true, thrift_profiles));
   return Status::OK();
 }
 
@@ -127,21 +127,21 @@ void ControlService::ReportExecStatus(const ReportExecStatusRequestPB* request,
   // sidecar and deserialize the thrift profile if there is any. The sender may have
   // failed to serialize the Thrift profile so an empty thrift profile is valid.
   // TODO: Fix IMPALA-7232 to indicate incomplete profile in this case.
-  TRuntimeProfileTree thrift_profile;
+  TRuntimeProfileForest thrift_profiles;
   if (LIKELY(request->has_thrift_profiles_sidecar_idx())) {
     const Status& profile_status =
-        GetProfile(*request, *request_state.get(), rpc_context, &thrift_profile);
+        GetProfile(*request, *request_state.get(), rpc_context, &thrift_profiles);
     if (UNLIKELY(!profile_status.ok())) {
       LOG(ERROR) << Substitute("ReportExecStatus(): Failed to deserialize profile "
           "for query ID $0: $1", PrintId(request_state->query_id()),
           profile_status.GetDetail());
       // Do not expose a partially deserialized profile.
-      TRuntimeProfileTree empty_profile;
-      swap(thrift_profile, empty_profile);
+      TRuntimeProfileForest empty_profiles;
+      swap(thrift_profiles, empty_profiles);
     }
   }
 
-  Status resp_status = request_state->UpdateBackendExecStatus(*request, thrift_profile);
+  Status resp_status = request_state->UpdateBackendExecStatus(*request, thrift_profiles);
   RespondAndReleaseRpc(resp_status, response, rpc_context);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/be/src/service/control-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/control-service.h b/be/src/service/control-service.h
index ca2828a..44871d7 100644
--- a/be/src/service/control-service.h
+++ b/be/src/service/control-service.h
@@ -66,10 +66,10 @@ class ControlService : public ControlServiceIf {
 
    /// Helper for deserializing runtime profile from the sidecar attached in the inbound
    /// call within 'rpc_context'. On success, returns the deserialized profile in
-   /// 'thrift_profile'. On failure, returns the error status;
+   /// 'thrift_profiles'. On failure, returns the error status;
    static Status GetProfile(const ReportExecStatusRequestPB& request,
        const ClientRequestState& request_state, kudu::rpc::RpcContext* rpc_context,
-       TRuntimeProfileTree* thrift_profile);
+       TRuntimeProfileForest* thrift_profiles);
 
    /// Helper for serializing 'status' as part of 'response'. Also releases memory
    /// of the RPC payload previously accounted towards the internal memory tracker.

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/common/protobuf/control_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index cf64c11..c546959 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -116,22 +116,19 @@ message FragmentInstanceExecStatusPB {
   // The ID of the fragment instance which this report contains
   optional UniqueIdPB fragment_instance_id = 2;
 
-  // Status of fragment execution; any error status means it's done.
-  optional StatusPB status = 3;
-
   // If true, fragment finished executing.
-  optional bool done = 4;
+  optional bool done = 3;
 
   // The current state of this fragment instance's execution.
-  optional FInstanceExecStatePB current_state = 5;
+  optional FInstanceExecStatePB current_state = 4;
 
   // Cumulative structural changes made by the table sink of this fragment
   // instance. This is sent only when 'done' above is true. Not idempotent.
-  optional DmlExecStatusPB dml_exec_status = 6;
+  optional DmlExecStatusPB dml_exec_status = 5;
 
   // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
   // the coordinator by this fragment instance. Not idempotent.
-  map<int32, ErrorLogEntryPB> error_log = 7;
+  map<int32, ErrorLogEntryPB> error_log = 6;
 }
 
 message ReportExecStatusRequestPB {
@@ -147,17 +144,14 @@ message ReportExecStatusRequestPB {
   // in instance_exec_status.
   optional int32 thrift_profiles_sidecar_idx = 4;
 
-  // Cumulative status for this backend. A backend can have an error from a specific
-  // fragment instance, or it can have a general error that is independent of any
-  // individual fragment. If reporting a single error, this status is always set to
-  // the error being reported. If reporting multiple errors, the status is set by the
-  // following rules:
-  // 1. A general error takes precedence over any fragment instance error.
-  // 2. Any fragment instance error takes precedence over any cancelled status.
-  // 3. If multiple fragments have errors, prefer the error that comes first in the
-  // 'instance_exec_status' list.
-  // This status is only OK if all fragment instances included are OK.
-  optional StatusPB status = 5;
+  // Cumulative status for this backend.
+  // See QueryState::overall_status for details.
+  optional StatusPB overall_status = 5;
+
+  // The fragment instance id of the first failed fragment instance. This corresponds to
+  // the fragment which sets 'overall_status' above. Not set if 'overall_status' is a
+  // general error (e.g. failure to start fragment instances).
+  optional UniqueIdPB fragment_instance_id = 6;
 }
 
 message ReportExecStatusResponsePB {

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/common/thrift/RuntimeProfile.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/RuntimeProfile.thrift b/common/thrift/RuntimeProfile.thrift
index e7ad3c6..fe70f16 100644
--- a/common/thrift/RuntimeProfile.thrift
+++ b/common/thrift/RuntimeProfile.thrift
@@ -102,3 +102,8 @@ struct TRuntimeProfileTree {
   1: required list<TRuntimeProfileNode> nodes
   2: optional ExecStats.TExecSummary exec_summary
 }
+
+// A list of TRuntimeProfileTree structures.
+struct TRuntimeProfileForest {
+  1: required list<TRuntimeProfileTree> profile_trees
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index abc56e0..593e66b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -15,8 +15,7 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
+row_regex: .*0 of 1 Runtime Filter Published, 1 Disabled.*
 ====
 
 
@@ -33,9 +32,8 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*1 of 1 Runtime Filter Published.*
-#row_regex: .*Filter 0 \(64.00 KB\).*
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(64.00 KB\).*
 ====
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
@@ -47,9 +45,8 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*1 of 1 Runtime Filter Published.*
-#row_regex: .*Filter 0 \(256.00 KB\).*
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(256.00 KB\).*
 ====
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
@@ -61,9 +58,8 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*1 of 1 Runtime Filter Published.*
-#row_regex: .*Filter 0 \(512.00 KB\).*
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(512.00 KB\).*
 ====
 ---- QUERY
 SET RUNTIME_FILTER_MODE=GLOBAL;
@@ -75,9 +71,8 @@ select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*1 of 1 Runtime Filter Published.*
-#row_regex: .*Filter 0 \(1.00 MB\).*
+row_regex: .*1 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(1.00 MB\).*
 ====
 
 
@@ -118,9 +113,8 @@ with l as (select * from tpch.lineitem UNION ALL select * from tpch.lineitem)
 select STRAIGHT_JOIN count(*) from (select * from tpch.lineitem a LIMIT 1) a
     join (select * from l LIMIT 1000000) b on a.l_orderkey = -b.l_orderkey;
 ---- RUNTIME_PROFILE
-# TODO: reenable once IMPALA-6338 is fixed
-#row_regex: .*0 of 1 Runtime Filter Published.*
-#row_regex: .*Filter 0 \(64.00 KB\).*
+row_regex: .*0 of 1 Runtime Filter Published.*
+row_regex: .*Filter 0 \(64.00 KB\).*
 ====
 
 

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test b/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
new file mode 100644
index 0000000..c5ffda5
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf-no-expr-rewrite.test
@@ -0,0 +1,30 @@
+====
+---- QUERY
+# Only run without expr rewrites (constant folding) because count_rows() is
+# non-deterministic.
+select count_rows() from functional.alltypestiny;
+---- TYPES
+BIGINT
+---- RESULTS
+1
+2
+3
+4
+5
+6
+7
+8
+====
+---- QUERY
+# Due to IMPALA-3860, the error is not propagated when expr rewrites is enabled.
+select mem_test_leaks(100);
+---- TYPES
+bigint
+---- RESULTS
+100
+---- ERRORS
+#A fragment instance's last status report may be sent before calling Close() which is
+#where the memory leak is detected. So, we don't always get the following error message.
+#UDF WARNING: Memory leaked via FunctionContext::Allocate(), 100 bytes leaked via FunctionContext::TrackAllocation()
+row_regex:.*
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test b/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
deleted file mode 100644
index c806d9d..0000000
--- a/testdata/workloads/functional-query/queries/QueryTest/udf-non-deterministic.test
+++ /dev/null
@@ -1,17 +0,0 @@
-====
----- QUERY
-# Only run without expr rewrites (constant folding) because count_rows() is
-# non-deterministic.
-select count_rows() from functional.alltypestiny;
----- TYPES
-BIGINT
----- RESULTS
-1
-2
-3
-4
-5
-6
-7
-8
-====

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/testdata/workloads/functional-query/queries/QueryTest/udf.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf.test b/testdata/workloads/functional-query/queries/QueryTest/udf.test
index 6260304..cb27669 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/udf.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/udf.test
@@ -470,16 +470,6 @@ bigint
 100
 ====
 ---- QUERY
-select mem_test_leaks(100);
----- TYPES
-bigint
----- RESULTS
-100
----- ERRORS
-# TODO: this should print a warning, but we can't retrieve errors from the runtime state
-# log if they're added while the query is closing.
-====
----- QUERY
 # Make sure rand() is non-constant
 select constant_arg(cast(rand() as int));
 ---- TYPES

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index c351e02..9464eec 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -148,7 +148,6 @@ class TestObservability(ImpalaTestSuite):
     assert exec_summary is not None and exec_summary.nodes is not None
 
   @SkipIfLocal.multiple_impalad
-  @pytest.mark.xfail(reason="IMPALA-6338")
   def test_profile_fragment_instances(self):
     """IMPALA-6081: Test that the expected number of fragment instances and their exec
     nodes appear in the runtime profile, even when fragments may be quickly cancelled when

http://git-wip-us.apache.org/repos/asf/impala/blob/94103822/tests/query_test/test_udfs.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_udfs.py b/tests/query_test/test_udfs.py
index 6bb9b94..757c171 100644
--- a/tests/query_test/test_udfs.py
+++ b/tests/query_test/test_udfs.py
@@ -287,12 +287,12 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-codegen-required', vector, use_db=unique_database)
     self.run_test_case('QueryTest/uda', vector, use_db=unique_database)
     self.run_test_case('QueryTest/udf-init-close', vector, use_db=unique_database)
-    # Some tests assume determinism or non-determinism, which depends on expr rewrites.
+    # Some tests assume no expr rewrites.
     if enable_expr_rewrites:
       self.run_test_case('QueryTest/udf-init-close-deterministic', vector,
           use_db=unique_database)
     else:
-      self.run_test_case('QueryTest/udf-non-deterministic', vector,
+      self.run_test_case('QueryTest/udf-no-expr-rewrite', vector,
           use_db=unique_database)
 
   def test_ir_functions(self, vector, unique_database):
@@ -310,8 +310,7 @@ class TestUdfExecution(TestUdfBase):
       self.run_test_case('QueryTest/udf-init-close-deterministic', vector,
           use_db=unique_database)
     else:
-      self.run_test_case('QueryTest/udf-non-deterministic', vector,
-          use_db=unique_database)
+      self.run_test_case('QueryTest/udf-no-expr-rewrite', vector, use_db=unique_database)
 
   def test_java_udfs(self, vector, unique_database):
     self.run_test_case('QueryTest/load-java-udfs', vector, use_db=unique_database)