You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/11/16 15:26:11 UTC

[impala] branch master updated: IMPALA-9125: generalize finding DataSink from other fragment

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c2df0b  IMPALA-9125: generalize finding DataSink from other fragment
2c2df0b is described below

commit 2c2df0bcf0fbc77ba1244b6700deebb834355086
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Thu Nov 14 13:30:52 2019 -0800

    IMPALA-9125: generalize finding DataSink from other fragment
    
    This is a minor refactor that makes the existing mechanism
    for finding PlanRootSink more general:
    * Status-returning API to reduce boilerplate in caller.
    * Avoid storing PlanRootSink as a member - this doesn't scale
      nicely to multiple subclasses. Instead use an accessor method.
    
    Change-Id: I3c91cadf9ecfde9cc3235bd71412ba10ee37bc4e
    Reviewed-on: http://gerrit.cloudera.org:8080/14710
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc             | 15 +++++----------
 be/src/runtime/fragment-instance-state.cc | 13 ++++++++++---
 be/src/runtime/fragment-instance-state.h  |  6 +-----
 be/src/runtime/query-state.cc             |  8 +++++---
 be/src/runtime/query-state.h              |  6 +++++-
 5 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 6b5ba9a..c98203a 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -146,18 +146,13 @@ Status Coordinator::Exec() {
   // set coord_instance_ and coord_sink_
   if (schedule_.GetCoordFragment() != nullptr) {
     // this blocks until all fragment instances have finished their Prepare phase
-    coord_instance_ = query_state_->GetFInstanceState(query_id());
-    if (coord_instance_ == nullptr) {
-      // at this point, the query is done with the Prepare phase, and we expect
-      // to have a coordinator instance, but coord_instance_ == nullptr,
-      // which means we failed before or during Prepare().
-      Status query_status = query_state_->WaitForPrepare();
-      DCHECK(!query_status.ok());
-      return UpdateExecState(query_status, nullptr, FLAGS_hostname);
-    }
+    Status query_status = query_state_->GetFInstanceState(query_id(), &coord_instance_);
+    if (!query_status.ok()) return UpdateExecState(query_status, nullptr, FLAGS_hostname);
+    // We expected this query to have a coordinator instance.
+    DCHECK(coord_instance_ != nullptr);
     // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
     // done and the FragmentInstanceState's root sink will be set up.
-    coord_sink_ = coord_instance_->root_sink();
+    coord_sink_ = coord_instance_->GetRootSink();
     DCHECK(coord_sink_ != nullptr);
   }
   return Status::OK();
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index ec4cb25..320e477 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -125,7 +125,8 @@ done:
 void FragmentInstanceState::Cancel() {
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->Cancel();
-  if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
+  PlanRootSink* root_sink = GetRootSink();
+  if (root_sink != nullptr) root_sink->Cancel(runtime_state_);
   ExecEnv::GetInstance()->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
@@ -219,8 +220,8 @@ Status FragmentInstanceState::Prepare() {
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != nullptr) profile()->AddChild(sink_profile);
 
-  if (fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
-    root_sink_ = reinterpret_cast<PlanRootSink*>(sink_);
+  PlanRootSink* root_sink = GetRootSink();
+  if (root_sink != nullptr) {
     // Release the thread token on the root fragment instance. This fragment spends most
     // of the time waiting and doing very little work. Holding on to the token causes
     // underutilization of the machine. If there are 12 queries on this node, that's 12
@@ -544,6 +545,12 @@ const string& FragmentInstanceState::ExecStateToString(FInstanceExecStatePB stat
   return finstance_state_labels[state];
 }
 
+PlanRootSink* FragmentInstanceState::GetRootSink() const {
+  return fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK ?
+      static_cast<PlanRootSink*>(sink_) :
+      nullptr;
+}
+
 const TQueryCtx& FragmentInstanceState::query_ctx() const {
   return query_state_->query_ctx();
 }
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 33cf656..bedf31f 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -112,7 +112,7 @@ class FragmentInstanceState {
 
   /// Returns fragment instance's sink if this is the root fragment instance. Valid after
   /// the Prepare phase. May be nullptr.
-  PlanRootSink* root_sink() { return root_sink_; }
+  PlanRootSink* GetRootSink() const;
 
   /// Returns a string description of 'state'.
   static const string& ExecStateToString(FInstanceExecStatePB state);
@@ -219,10 +219,6 @@ class FragmentInstanceState {
   /// obj_pool().
   DataSink* sink_ = nullptr;
 
-  /// Set if this fragment instance is the root of the entire plan, so that a consumer can
-  /// pull results by calling root_sink_->GetNext(). Same object as sink_.
-  PlanRootSink* root_sink_ = nullptr;
-
   /// should live in obj_pool(), but managed separately so we can delete it in Close()
   boost::scoped_ptr<RowBatch> row_batch_;
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 77e6b10..5b60be0 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -290,11 +290,13 @@ void QueryState::UpdateBackendExecState() {
   }
 }
 
-FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
+Status QueryState::GetFInstanceState(
+    const TUniqueId& instance_id, FragmentInstanceState** fi_state) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  if (!WaitForPrepare().ok()) return nullptr;
+  RETURN_IF_ERROR(WaitForPrepare());
   auto it = fis_map_.find(instance_id);
-  return it != fis_map_.end() ? it->second : nullptr;
+  *fi_state = it != fis_map_.end() ? it->second : nullptr;
+  return Status::OK();
 }
 
 void QueryState::ConstructReport(bool instances_started,
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index bbb0fb7..39101e1 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -196,7 +196,11 @@ class QueryState {
   void MonitorFInstances();
 
   /// Blocks until all fragment instances have finished their Prepare phase.
-  FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+  /// Returns the fragment instance state for 'instance_id' in *fi_state,
+  /// or nullptr if it is not present.
+  /// Returns an error if fragment preparation failed.
+  Status GetFInstanceState(
+      const TUniqueId& instance_id, FragmentInstanceState** fi_state);
 
   /// Blocks until all fragment instances have finished their Prepare phase.
   void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);