You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/01/18 05:28:35 UTC

[impala] 01/03: IMPALA-9296: Move AuxErrorInfo to StatefulStatus

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7f7743dcc67f112e684737acb5441c544f2c94b1
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Wed Jan 15 13:28:40 2020 -0800

    IMPALA-9296: Move AuxErrorInfo to StatefulStatus
    
    This patch moves AuxErrorInfoPB from FragmentInstanceExecStatusPB to
    StatefulStatusPB. This is necessary because if the report with the
    AuxErrorInfoPB is dropped (e.g. due to backpressure at the Coordinator
    or a flaky network), the next report won't contain the AuxErrorInfoPB,
    and the error info will be lost. StatefulStatus solves this by detecting
    any reports that may not have been received by the Coordinator, and
    re-transmitting any StatefulStatuses that were not successfully
    delivered.
    
    This change also makes the setting of AuxErrorInfoPB stateful, so that
    the error info only shows up in one report and is then dropped from the
    RuntimeState.
    
    Change-Id: Iabbb48dd3ab58ba7b76b1ab6979b92d0e25e72e3
    Reviewed-on: http://gerrit.cloudera.org:8080/15046
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc | 8 ++++----
 be/src/runtime/fragment-instance-state.cc   | 9 +++++++--
 be/src/runtime/runtime-state.cc             | 6 ++++--
 be/src/runtime/runtime-state.h              | 8 ++++++--
 common/protobuf/control_service.proto       | 8 ++++----
 5 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index f4b0536..062a97d 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -420,6 +420,10 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
           MergeErrorMaps(stateful_report.error_log(), &error_log_);
           VLOG_FILE << "host=" << TNetworkAddressToString(host_)
                     << " error log: " << PrintErrorMapToString(error_log_);
+
+          if (stateful_report.has_aux_error_info()) {
+            aux_error_info->push_back(stateful_report.aux_error_info());
+          }
         }
       }
     }
@@ -444,10 +448,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       // TODO: We're losing this profile information. Call ReportQuerySummary only after
       // all backends have completed.
     }
-
-    if (instance_exec_status.has_aux_error_info()) {
-      aux_error_info->push_back(instance_exec_status.aux_error_info());
-    }
   }
 
   // status_ has incorporated the status from all fragment instances. If the overall
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index bc34d17..b74bbe3 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -286,15 +286,20 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
     *instance_status->mutable_stateful_report() =
         {prev_stateful_reports_.begin(), prev_stateful_reports_.end()};
   }
+  StatefulStatusPB* stateful_report = nullptr;
   if (runtime_state()->HasErrors()) {
     // Add any new errors.
-    StatefulStatusPB* stateful_report = instance_status->add_stateful_report();
+    stateful_report = instance_status->add_stateful_report();
     stateful_report->set_report_seq_no(report_seq_no_);
     runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
   }
   // If set in the RuntimeState, set the AuxErrorInfoPB field.
   if (runtime_state()->HasAuxErrorInfo()) {
-    runtime_state()->GetAuxErrorInfo(instance_status->mutable_aux_error_info());
+    if (stateful_report == nullptr) {
+      stateful_report = instance_status->add_stateful_report();
+      stateful_report->set_report_seq_no(report_seq_no_);
+    }
+    runtime_state()->GetUnreportedAuxErrorInfo(stateful_report->mutable_aux_error_info());
   }
 }
 
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index a7ff96d..5102ad9 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -313,7 +313,7 @@ void RuntimeState::ReleaseResources() {
 
 void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_error_code) {
   boost::lock_guard<SpinLock> l(aux_error_info_lock_);
-  if (aux_error_info_ == nullptr) {
+  if (aux_error_info_ == nullptr && !reported_aux_error_info_) {
     aux_error_info_.reset(new AuxErrorInfoPB());
     RPCErrorInfoPB* rpc_error_info = aux_error_info_->mutable_rpc_error_info();
     NetworkAddressPB* network_addr = rpc_error_info->mutable_dest_node();
@@ -323,11 +323,13 @@ void RuntimeState::SetRPCErrorInfo(TNetworkAddress dest_node, int16_t posix_erro
   }
 }
 
-void RuntimeState::GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
+void RuntimeState::GetUnreportedAuxErrorInfo(AuxErrorInfoPB* aux_error_info) {
   boost::lock_guard<SpinLock> l(aux_error_info_lock_);
   if (aux_error_info_ != nullptr) {
     aux_error_info->CopyFrom(*aux_error_info_);
   }
+  aux_error_info_ = nullptr;
+  reported_aux_error_info_ = true;
 }
 
 const std::string& RuntimeState::GetEffectiveUser() const {
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 884fae6..b1d18ce 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -319,8 +319,9 @@ class RuntimeState {
   /// Sets the given AuxErrorInfoPB with all relevant aux error info from the fragment
   /// instance associated with this RuntimeState. If no aux error info for this
   /// RuntimeState has been set, this method does nothing. Currently, only
-  /// SetRPCErrorInfo() sets aux error info.
-  void GetAuxErrorInfo(AuxErrorInfoPB* aux_error_info);
+  /// SetRPCErrorInfo() sets aux error info. This method clears aux_error_info_. Calls to
+  /// HasAuxErrorInfo() after this method has been called will return false.
+  void GetUnreportedAuxErrorInfo(AuxErrorInfoPB* aux_error_info);
 
   static const char* LLVM_CLASS_NAME;
 
@@ -441,6 +442,9 @@ class RuntimeState {
   /// query_status_ != Status::OK()). Owned by this RuntimeState.
   std::unique_ptr<AuxErrorInfoPB> aux_error_info_;
 
+  /// True if aux_error_info_ has been sent in a status report, false otherwise.
+  bool reported_aux_error_info_ = false;
+
   /// prohibit copies
   RuntimeState(const RuntimeState&);
 
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 085be7a..ddbbc68 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -122,6 +122,10 @@ message StatefulStatusPB {
   // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
   // the coordinator by this fragment instance. Not idempotent.
   map<int32, ErrorLogEntryPB> error_log = 2;
+
+  // Metadata associated with a failed fragment instance. Only set for failed fragment
+  // instances.
+  optional AuxErrorInfoPB aux_error_info = 3;
 }
 
 // RPC error metadata that can be associated with a AuxErrorInfoPB object. Created if a
@@ -163,10 +167,6 @@ message FragmentInstanceExecStatusPB {
   // The non-idempotent parts of the report, and any prior reports that are not known to
   // have been received by the coordinator.
   repeated StatefulStatusPB stateful_report = 6;
-
-  // Metadata associated with a failed fragment instance. Only set for failed fragment
-  // instances.
-  optional AuxErrorInfoPB aux_error_info = 7;
 }
 
 message ReportExecStatusRequestPB {