You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/02/07 23:05:48 UTC

[impala] 02/04: IMPALA-4555: Make QueryState's status reporting more robust

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b1e4957ba78ef496d21728606889d1eb83ef6b27
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Tue Dec 4 23:53:38 2018 +0000

    IMPALA-4555: Make QueryState's status reporting more robust
    
    QueryState periodically collects runtime profiles from all of its
    fragment instances and sends them to the coordinator. Previously, each
    time this happens, if the rpc fails, QueryState will retry twice after
    a configurable timeout and then cancel the fragment instances under
    the assumption that the coordinator no longer exists.
    
    We've found in real clusters that this logic is too sensitive to
    failed rpcs and can result in fragment instances being cancelled even
    in cases where the coordinator is still running.
    
    This patch makes a few improvements to this logic:
    - When a report fails to send, instead of retrying the same report
      quickly (after waiting report_status_retry_interval_ms), we wait the
      regular reporting interval (status_report_interval_ms), regenerate
      any stale portions of the report, and then retry.
    - A new flag, --status_report_max_retries, is introduced, which
      controls the number of failed reports that are allowed before the
      query is cancelled. --report_status_retry_interval_ms is removed.
    - Backoff is used for repeated failed attempts, such that for a period
      between retries of 't', on try 'n' the actual timeout will be t * n.
    
    Testing:
    - Added a test which results in a large number of failed intermediate
      status reports but still succeeds.
    
    Change-Id: Ib6007013fc2c9e8eeba11b752ee58fb3038da971
    Reviewed-on: http://gerrit.cloudera.org:8080/12049
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc               |   1 +
 be/src/runtime/coordinator-backend-state.cc |  23 ++++--
 be/src/runtime/fragment-instance-state.cc   |  45 ++++++++++-
 be/src/runtime/fragment-instance-state.h    |  20 ++++-
 be/src/runtime/query-state.cc               | 118 +++++++++++++++++-----------
 be/src/runtime/query-state.h                |  14 +++-
 common/protobuf/control_service.proto       |  21 ++++-
 tests/custom_cluster/test_rpc_timeout.py    |  17 ++++
 8 files changed, 193 insertions(+), 66 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 6620bdc..470a111 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -313,6 +313,7 @@ REMOVED_FLAG(llama_registration_timeout_secs);
 REMOVED_FLAG(llama_registration_wait_secs);
 REMOVED_FLAG(local_nodemanager_url);
 REMOVED_FLAG(max_free_io_buffers);
+REMOVED_FLAG(report_status_retry_interval_ms);
 REMOVED_FLAG(resource_broker_cnxn_attempts);
 REMOVED_FLAG(resource_broker_cnxn_retry_interval_ms);
 REMOVED_FLAG(resource_broker_recv_timeout);
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index ba4f355..2bc8547 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -319,10 +319,11 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id());
     DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
     InstanceStats* instance_stats = instance_stats_map_[instance_idx];
+    int64_t last_report_seq_no = instance_stats->last_report_seq_no_;
     DCHECK(instance_stats->exec_params_.instance_id ==
         ProtoToQueryId(instance_exec_status.fragment_instance_id()));
     // Ignore duplicate or out-of-order messages.
-    if (report_seq_no <= instance_stats->last_report_seq_no_) {
+    if (report_seq_no <= last_report_seq_no) {
       VLOG_QUERY << Substitute("Ignoring stale update for query instance $0 with "
           "seq no $1", PrintId(instance_stats->exec_params_.instance_id), report_seq_no);
       continue;
@@ -340,13 +341,19 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       dml_exec_state->Update(instance_exec_status.dml_exec_status());
     }
 
-    // Log messages aggregated by type
-    if (instance_exec_status.error_log_size() > 0) {
-      // Append the log messages from each update with the global state of the query
-      // execution
-      MergeErrorMaps(instance_exec_status.error_log(), &error_log_);
-      VLOG_FILE << "host=" << TNetworkAddressToString(host_) << " error log: " <<
-          PrintErrorMapToString(error_log_);
+    // Handle the non-idempotent parts of the report for any sequence numbers that we
+    // haven't seen yet.
+    if (instance_exec_status.stateful_report_size() > 0) {
+      for (const auto& stateful_report : instance_exec_status.stateful_report()) {
+        DCHECK_LE(stateful_report.report_seq_no(), report_seq_no);
+        if (last_report_seq_no < stateful_report.report_seq_no()) {
+          // Append the log messages from each update with the global state of the query
+          // execution
+          MergeErrorMaps(stateful_report.error_log(), &error_log_);
+          VLOG_FILE << "host=" << TNetworkAddressToString(host_)
+                    << " error log: " << PrintErrorMapToString(error_log_);
+        }
+      }
     }
 
     DCHECK_GT(num_remaining_instances_, 0);
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index c5799ee..749e135 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -255,7 +255,13 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
   if (per_host_mem_usage_ != nullptr) {
     per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
   }
-  instance_status->set_report_seq_no(AdvanceReportSeqNo());
+  if (final_report_generated_) {
+    // Since execution was already finished, the contents of this report will be identical
+    // to the last report, so don't advance the sequence number.
+    instance_status->set_report_seq_no(report_seq_no_);
+  } else {
+    instance_status->set_report_seq_no(AdvanceReportSeqNo());
+  }
   const TUniqueId& finstance_id = instance_id();
   TUniqueIdToUniqueIdPB(finstance_id, instance_status->mutable_fragment_instance_id());
   const bool done = IsDone();
@@ -267,10 +273,41 @@ void FragmentInstanceState::GetStatusReport(FragmentInstanceExecStatusPB* instan
   if (done) {
     runtime_state()->dml_exec_state()->ToProto(
         instance_status->mutable_dml_exec_status());
-    final_report_sent_ = true;
+    final_report_generated_ = true;
+  }
+  if (prev_stateful_reports_.size() > 0) {
+    // Send errors from previous reports that failed.
+    *instance_status->mutable_stateful_report() =
+        {prev_stateful_reports_.begin(), prev_stateful_reports_.end()};
+  }
+  if (runtime_state()->HasErrors()) {
+    // Add any new errors.
+    StatefulStatusPB* stateful_report = instance_status->add_stateful_report();
+    stateful_report->set_report_seq_no(report_seq_no_);
+    runtime_state()->GetUnreportedErrors(stateful_report->mutable_error_log());
+  }
+}
+
+void FragmentInstanceState::ReportSuccessful(
+    const FragmentInstanceExecStatusPB& instance_exec_status) {
+  prev_stateful_reports_.clear();
+  if (instance_exec_status.done()) final_report_sent_ = true;
+}
+
+void FragmentInstanceState::ReportFailed(
+    const FragmentInstanceExecStatusPB& instance_exec_status) {
+  int num_reports = instance_exec_status.stateful_report_size();
+  if (num_reports > 0 && prev_stateful_reports_.size() != num_reports) {
+    // If a stateful report was generated in GetStatusReport(), copy it to
+    // 'prev_stateful_reports_'. It will be the last one in the list and will have a seq
+    // no that matches the overall report's seq no. There can be at most 1 new stateful
+    // report that has been generated since the last call to ReportSuccessful()/Failed().
+    DCHECK_EQ(prev_stateful_reports_.size() + 1, num_reports);
+    const StatefulStatusPB& stateful_report =
+        instance_exec_status.stateful_report()[num_reports - 1];
+    DCHECK_EQ(stateful_report.report_seq_no(), instance_exec_status.report_seq_no());
+    prev_stateful_reports_.emplace_back(stateful_report);
   }
-  // Send new errors to coordinator.
-  runtime_state()->GetUnreportedErrors(instance_status->mutable_error_log());
 }
 
 Status FragmentInstanceState::Open() {
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 3a8768c..240d44f 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -96,6 +96,13 @@ class FragmentInstanceState {
   void GetStatusReport(FragmentInstanceExecStatusPB* instance_status,
       TRuntimeProfileTree* thrift_profile);
 
+  /// After each call to GetStatusReport(), the query state thread should call one of the
+  /// following to indicate if the report rpc was successful. Note that in the case of
+  /// ReportFailed(), the report may have been received by the coordinator even though the
+  /// rpc appeared to fail.
+  void ReportSuccessful(const FragmentInstanceExecStatusPB& instance_status);
+  void ReportFailed(const FragmentInstanceExecStatusPB& instance_status);
+
   /// Returns fragment instance's sink if this is the root fragment instance. Valid after
   /// the Prepare phase. May be nullptr.
   PlanRootSink* root_sink() { return root_sink_; }
@@ -158,6 +165,14 @@ class FragmentInstanceState {
   /// state thread only. Written in GetStatusReport() by the query state thread.
   bool final_report_sent_ = false;
 
+  /// The non-idempotent parts of any reports that were generated but may not have been
+  /// received by the coordinator.
+  std::vector<StatefulStatusPB> prev_stateful_reports_;
+
+  /// True if a report has been generated where 'done' is true, after which the sequence
+  /// number should not be bumped for future reports.
+  bool final_report_generated_ = false;
+
   /// Profile for timings for each stage of the plan fragment instance's lifecycle.
   /// Lives in obj_pool().
   RuntimeProfile* timings_profile_ = nullptr;
@@ -212,7 +227,10 @@ class FragmentInstanceState {
 
   /// Returns the monotonically increasing sequence number.
   /// Called by query state thread only.
-  int64_t AdvanceReportSeqNo() { return ++report_seq_no_; }
+  int64_t AdvanceReportSeqNo() {
+    DCHECK(!final_report_generated_);
+    return ++report_seq_no_;
+  }
 
   /// A counter for the per query, per host peak mem usage. Note that this is not the
   /// max of the peak memory of all fragments running on a host since it needs to take
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index c0b5cc6..beef32c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -54,11 +54,13 @@ using kudu::rpc::RpcSidecar;
 
 #include "common/names.h"
 
-DEFINE_int32(status_report_interval_ms, 5000,
-    "interval between profile reports; in milliseconds");
-DEFINE_int32(report_status_retry_interval_ms, 100,
-    "The interval in milliseconds to wait before retrying a failed status report RPC to "
-    "the coordinator.");
+static const int DEFAULT_REPORT_WAIT_TIME_MS = 5000;
+
+DEFINE_int32(status_report_interval_ms, DEFAULT_REPORT_WAIT_TIME_MS,
+    "Interval between profile reports in milliseconds. If set to <= 0, periodic "
+    "reporting is disabled.");
+DEFINE_int32(status_report_max_retries, 3,
+    "Max number of times to retry sending the status report before cancelling");
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
 
@@ -268,7 +270,7 @@ void QueryState::UpdateBackendExecState() {
   // Send one last report if the query has reached the terminal state.
   if (IsTerminalState()) {
     VLOG_QUERY << "UpdateBackendExecState(): last report for " << PrintId(query_id());
-    ReportExecStatus();
+    while (!ReportExecStatus()) SleepForMs(GetReportWaitTimeMs());
   }
 }
 
@@ -317,7 +319,7 @@ void QueryState::ConstructReport(bool instances_started,
   }
 }
 
-void QueryState::ReportExecStatus() {
+bool QueryState::ReportExecStatus() {
 #ifndef NDEBUG
   if (FLAGS_stress_status_report_delay_ms) {
     LOG(INFO) << "Sleeping " << FLAGS_stress_status_report_delay_ms << "ms before "
@@ -353,52 +355,64 @@ void QueryState::ReportExecStatus() {
         serialize_status.ok() ? "OK" : serialize_status.GetDetail(), profile_len);
   }
 
-  // Try to send the RPC 3 times before failing. Sleep for 100ms between retries.
-  // It's safe to retry the RPC as the coordinator handles duplicate RPC messages.
   Status rpc_status;
   Status result_status;
-  for (int i = 0; i < 3; ++i) {
-    RpcController rpc_controller;
-
-    // The profile is a thrift structure serialized to a string and sent as a sidecar.
-    // We keep the runtime profile as Thrift object as Impala client still communicates
-    // with Impala server with Thrift RPC.
-    //
-    // Note that the sidecar is created with faststring so the ownership of the Thrift
-    // profile buffer is transferred to RPC layer and it is freed after the RPC payload
-    // is sent. If serialization of the profile to RPC sidecar fails, we will proceed
-    // without the profile so that the coordinator can still get the status instead of
-    // hitting IMPALA-2990.
-    if (profile_buf != nullptr) {
-      unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
-      sidecar_buf->assign_copy(profile_buf, profile_len);
-      unique_ptr<RpcSidecar> sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
-
-      int sidecar_idx;
-      kudu::Status sidecar_status =
-          rpc_controller.AddOutboundSidecar(move(sidecar), &sidecar_idx);
-      if (LIKELY(sidecar_status.ok())) {
-        report.set_thrift_profiles_sidecar_idx(sidecar_idx);
-      } else {
-        LOG(DFATAL) <<
-            FromKuduStatus(sidecar_status, "Failed to add sidecar").GetDetail();
-      }
+  RpcController rpc_controller;
+
+  // The profile is a thrift structure serialized to a string and sent as a sidecar.
+  // We keep the runtime profile as Thrift object as Impala client still communicates
+  // with Impala server with Thrift RPC.
+  //
+  // Note that the sidecar is created with faststring so the ownership of the Thrift
+  // profile buffer is transferred to RPC layer and it is freed after the RPC payload
+  // is sent. If serialization of the profile to RPC sidecar fails, we will proceed
+  // without the profile so that the coordinator can still get the status instead of
+  // hitting IMPALA-2990.
+  if (profile_buf != nullptr) {
+    unique_ptr<kudu::faststring> sidecar_buf = make_unique<kudu::faststring>();
+    sidecar_buf->assign_copy(profile_buf, profile_len);
+    unique_ptr<RpcSidecar> sidecar = RpcSidecar::FromFaststring(move(sidecar_buf));
+
+    int sidecar_idx;
+    kudu::Status sidecar_status =
+        rpc_controller.AddOutboundSidecar(move(sidecar), &sidecar_idx);
+    if (LIKELY(sidecar_status.ok())) {
+      report.set_thrift_profiles_sidecar_idx(sidecar_idx);
+    } else {
+      LOG(DFATAL) << FromKuduStatus(sidecar_status, "Failed to add sidecar").GetDetail();
     }
+  }
+
+  rpc_controller.set_timeout(
+      MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
+  ReportExecStatusResponsePB resp;
+  rpc_status = FromKuduStatus(proxy_->ReportExecStatus(report, &resp, &rpc_controller),
+      "ReportExecStatus() RPC failed");
+  result_status = Status(resp.status());
+  if (rpc_status.ok()) {
+    num_failed_reports_ = 0;
+  } else {
+    ++num_failed_reports_;
+    LOG(WARNING) << Substitute("Failed to send ReportExecStatus() RPC for query $0. "
+                               "Consecutive failed reports = $1",
+        PrintId(query_id()), num_failed_reports_);
+  }
 
-    rpc_controller.set_timeout(
-        MonoDelta::FromMilliseconds(FLAGS_backend_client_rpc_timeout_ms));
-    ReportExecStatusResponsePB resp;
-    rpc_status = FromKuduStatus(proxy_->ReportExecStatus(report, &resp, &rpc_controller),
-        "ReportExecStatus() RPC failed");
-    result_status = Status(resp.status());
-    if (rpc_status.ok()) break;
-    //TODO: Consider exponential backoff.
-    if (i < 2) SleepForMs(FLAGS_report_status_retry_interval_ms);
-    LOG(WARNING) <<
-        Substitute("Retrying ReportExecStatus() RPC for query $0", PrintId(query_id()));
+  // Notify the fragment instances of the report's status.
+  for (const FragmentInstanceExecStatusPB& instance_exec_status :
+      report.instance_exec_status()) {
+    const TUniqueId& id = ProtoToQueryId(instance_exec_status.fragment_instance_id());
+    FragmentInstanceState* fis = fis_map_[id];
+    if (rpc_status.ok()) {
+      fis->ReportSuccessful(instance_exec_status);
+    } else {
+      fis->ReportFailed(instance_exec_status);
+    }
   }
 
-  if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+  if (((!rpc_status.ok() && num_failed_reports_ >= FLAGS_status_report_max_retries)
+          || !result_status.ok())
+      && instances_started) {
     // TODO: should we try to keep rpc_status for the final report? (but the final
     // report, following this Cancel(), may not succeed anyway.)
     // TODO: not keeping an error status here means that all instances might
@@ -418,7 +432,15 @@ void QueryState::ReportExecStatus() {
                 << "Returned status: " << result_status.GetDetail();
     }
     Cancel();
+    return true;
   }
+
+  return rpc_status.ok();
+}
+
+int64_t QueryState::GetReportWaitTimeMs() const {
+  return (FLAGS_status_report_interval_ms > 0 ? FLAGS_status_report_interval_ms :
+      DEFAULT_REPORT_WAIT_TIME_MS) * (num_failed_reports_ + 1);
 }
 
 void QueryState::ErrorDuringPrepare(const Status& status, const TUniqueId& finst_id) {
@@ -564,7 +586,7 @@ void QueryState::MonitorFInstances() {
   DCHECK(backend_exec_state_ == BackendExecState::EXECUTING)
       << BackendExecStateToString(backend_exec_state_);
   if (FLAGS_status_report_interval_ms > 0) {
-    while (!WaitForFinishOrTimeout(FLAGS_status_report_interval_ms)) {
+    while (!WaitForFinishOrTimeout(GetReportWaitTimeMs())) {
       ReportExecStatus();
     }
   } else {
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 74afb8a..7aae063 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -387,6 +387,9 @@ class QueryState {
   /// Tracks host resource usage of this backend. Owned by 'obj_pool_', created in c'tor.
   RuntimeProfile* const host_profile_;
 
+  /// The number of failed intermediate reports since the last successfully sent report.
+  int64_t num_failed_reports_ = 0;
+
   /// Create QueryState w/ a refcnt of 0 and a memory limit of 'mem_limit' bytes applied
   /// to the query mem tracker. The query is associated with the resource pool set in
   /// 'query_ctx.request_pool' or from 'request_pool', if the former is not set (needed
@@ -413,8 +416,15 @@ class QueryState {
       TRuntimeProfileForest* profiles_forest);
 
   /// Gather statues and profiles of all fragment instances belonging to this query state
-  /// and send it to the coordinator via ReportExecStatus() RPC.
-  void ReportExecStatus();
+  /// and send it to the coordinator via ReportExecStatus() RPC. Returns true if the
+  /// report rpc was successful or if it was unsuccessful and we've reached the maximum
+  /// number of allowed failures and cancelled.
+  bool ReportExecStatus();
+
+  /// Returns the amount of time in ms to wait before sending the next status report,
+  /// calculated as a function of the status report interval with backoff based on the
+  /// number of consecutive failed reports.
+  int64_t GetReportWaitTimeMs() const;
 
   /// Returns true if the overall backend status is already set with an error.
   bool HasErrorStatus() const {
diff --git a/common/protobuf/control_service.proto b/common/protobuf/control_service.proto
index 273e0ae..8e6749d 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -109,6 +109,21 @@ enum FInstanceExecStatePB {
   FINISHED = 8;
 }
 
+// Represents any part of the status report that isn't idempotent. If the executor thinks
+// the report failed, we'll retransmit these parts, and this allows us to keep them
+// associated with their original sequence number so that if the coordinator actually did
+// receive the original report it won't reapply them.
+message StatefulStatusPB {
+  // Sequence number prevents out-of-order or duplicated updates from being applied.
+  // 'report_seq_no' will be <= the 'report_seq_no' in the FragmentInstanceExecStatusPB
+  // that contains this StatefulStatusPB.
+  optional int64 report_seq_no = 1;
+
+  // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
+  // the coordinator by this fragment instance. Not idempotent.
+  map<int32, ErrorLogEntryPB> error_log = 2;
+}
+
 message FragmentInstanceExecStatusPB {
   // Sequence number prevents out-of-order or duplicated updates from being applied.
   optional int64 report_seq_no = 1;
@@ -126,9 +141,9 @@ message FragmentInstanceExecStatusPB {
   // instance. This is sent only when 'done' above is true. Not idempotent.
   optional DmlExecStatusPB dml_exec_status = 5;
 
-  // Map of TErrorCode to ErrorLogEntryPB; New errors that have not been reported to
-  // the coordinator by this fragment instance. Not idempotent.
-  map<int32, ErrorLogEntryPB> error_log = 6;
+  // The non-idempotent parts of the report, and any prior reports that are not known to
+  // have been received by the coordinator.
+  repeated StatefulStatusPB stateful_report = 6;
 }
 
 message ReportExecStatusRequestPB {
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index bbec46a..d007ef4 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -148,3 +148,20 @@ class TestRPCTimeout(CustomClusterTestSuite):
   def test_reportexecstatus_profile_fail(self):
     query_options = {'debug_action': 'REPORT_EXEC_STATUS_PROFILE:FAIL@0.8'}
     self.execute_query_verify_metrics(self.TEST_QUERY, query_options, 10)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=100"
+      " --status_report_interval_ms=1000 --status_report_max_retries=100000")
+  def test_reportexecstatus_retries(self, unique_database):
+    tbl = "%s.kudu_test" % unique_database
+    self.execute_query("create table %s (a int primary key) stored as kudu" % tbl)
+    # Since the sleep time (1000ms) is much longer than the rpc timeout (100ms), all
+    # reports will appear to fail. The query is designed to result in many intermediate
+    # status reports but fewer than the max allowed failures, so the query should succeed.
+    query_options = {'debug_action': 'REPORT_EXEC_STATUS_DELAY:SLEEP@1000'}
+    result = self.execute_query(
+        "insert into %s select 0 from tpch.lineitem limit 100000" % tbl, query_options)
+    assert result.success, str(result)
+    # Ensure that the error log was tracked correctly - all but the first row inserted
+    # should result in a 'key already present' insert error.
+    assert "(1 of 99999 similar)" in result.log, str(result)