You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/04/02 03:28:14 UTC

incubator-kudu git commit: KUDU-1380. Fix retry for BUSY on non-FT scans

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 563313d15 -> fd556b2fd


KUDU-1380. Fix retry for BUSY on non-FT scans

This cleans up the code paths for error handling in Scan RPCs to make it much
easier to follow and eliminate duplicated code. In the process, it fixes a bug
where we would incorrectly re-open a tablet upon receiving SERVER_TOO_BUSY
errors. The tablet reopening behavior would result in the scanner rewinding to
the beginning of the tablet and yielding the same rows that had already
previously been returned to the user.

The test modification increases the number of rows to be scanned in the fault
tolerance test, and also adds an assertion that the number of rows returned
matches the number inserted. This would fail reliably without the bug fix.

Change-Id: I048d3aa2a163143d1637ae87281ed91f0fc5ac65
Reviewed-on: http://gerrit.cloudera.org:8080/2654
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/fd556b2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/fd556b2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/fd556b2f

Branch: refs/heads/master
Commit: fd556b2fd2a4e4cf044843ab251fa3855d3e3a13
Parents: 563313d
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Mar 28 20:02:22 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Sat Apr 2 01:27:56 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc      |  11 +-
 src/kudu/client/client.cc           |  89 +++++-----
 src/kudu/client/scanner-internal.cc | 269 ++++++++++++++++---------------
 src/kudu/client/scanner-internal.h  |  92 +++++++++--
 4 files changed, 263 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fd556b2f/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f5ae23e..e4d5321 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -150,8 +150,8 @@ class ClientTest : public KuduTest {
   // Count the rows of a table, checking that the operation succeeds.
   //
   // Must be public to use as a thread closure.
-  void CheckRowCount(KuduTable* table) {
-    CountRowsFromClient(table);
+  void CheckRowCount(KuduTable* table, int expected) {
+    CHECK_EQ(CountRowsFromClient(table), expected);
   }
 
  protected:
@@ -198,7 +198,7 @@ class ClientTest : public KuduTest {
   void InsertTestRows(KuduClient* client, KuduTable* table, int num_rows, int first_row = 0) {
     shared_ptr<KuduSession> session = client->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-    session->SetTimeoutMillis(10000);
+    session->SetTimeoutMillis(60000);
     for (int i = first_row; i < num_rows + first_row; i++) {
       gscoped_ptr<KuduInsert> insert(BuildTestRow(table, i));
       ASSERT_OK(session->Apply(insert.release()));
@@ -2707,7 +2707,8 @@ TEST_F(ClientTest, TestClonePredicates) {
 // Test that scanners will retry after receiving ERROR_SERVER_TOO_BUSY from an
 // overloaded tablet server. Regression test for KUDU-1079.
 TEST_F(ClientTest, TestServerTooBusyRetry) {
-  NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+  const int kNumRows = 100000;
+  NO_FATALS(InsertTestRows(client_table_.get(), kNumRows));
 
   // Introduce latency in each scan to increase the likelihood of
   // ERROR_SERVER_TOO_BUSY.
@@ -2728,7 +2729,7 @@ TEST_F(ClientTest, TestServerTooBusyRetry) {
   while (!stop) {
     scoped_refptr<kudu::Thread> thread;
     ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", t++),
-                                   &ClientTest::CheckRowCount, this, client_table_.get(),
+                                   &ClientTest::CheckRowCount, this, client_table_.get(), kNumRows,
                                    &thread));
     threads.push_back(thread);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fd556b2f/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 650216e..b9a2b63 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1125,7 +1125,7 @@ Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
   return Status::OK();
 }
 
-Status KuduScanner::NextBatch(KuduScanBatch* result) {
+Status KuduScanner::NextBatch(KuduScanBatch* batch) {
   // TODO: do some double-buffering here -- when we return this batch
   // we should already have fired off the RPC for the next batch, but
   // need to do some swapping of the response objects around to avoid
@@ -1133,7 +1133,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
   CHECK(data_->open_);
   CHECK(data_->proxy_);
 
-  result->data_->Clear();
+  batch->data_->Clear();
 
   if (data_->short_circuit_) {
     return Status::OK();
@@ -1143,7 +1143,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
     // We have data from a previous scan.
     VLOG(1) << "Extracting data from scan " << ToString();
     data_->data_in_open_ = false;
-    return result->data_->Reset(&data_->controller_,
+    return batch->data_->Reset(&data_->controller_,
                                 data_->projection_,
                                 &data_->client_projection_,
                                 make_gscoped_ptr(data_->last_response_.release_data()));
@@ -1151,59 +1151,48 @@ Status KuduScanner::NextBatch(KuduScanBatch* result) {
     // More data is available in this tablet.
     VLOG(1) << "Continuing scan " << ToString();
 
-    // The user has specified a timeout 'data_->timeout_' which should
-    // apply to the total time for each call to NextBatch(). However,
-    // if this is a fault-tolerant scan, it's preferable to set a shorter
-    // timeout (the "default RPC timeout" for each individual RPC call --
-    // so that if the server is hung we have time to fail over and try a
-    // different server.
-    MonoTime now = MonoTime::Now(MonoTime::FINE);
-
-    MonoTime batch_deadline = now;
+    MonoTime batch_deadline = MonoTime::Now(MonoTime::FINE);
     batch_deadline.AddDelta(data_->timeout_);
-
-    MonoTime rpc_deadline;
-    if (data_->is_fault_tolerant_) {
-      rpc_deadline = now;
-      rpc_deadline.AddDelta(data_->table_->client()->default_rpc_timeout());
-      rpc_deadline = MonoTime::Earliest(batch_deadline, rpc_deadline);
-    } else {
-      rpc_deadline = batch_deadline;
-    }
-
-    data_->controller_.Reset();
-    data_->controller_.set_deadline(rpc_deadline);
     data_->PrepareRequest(KuduScanner::Data::CONTINUE);
-    Status rpc_status = data_->proxy_->Scan(data_->next_req_,
-                                            &data_->last_response_,
-                                            &data_->controller_);
-    const Status server_status = data_->CheckForErrors();
-
-    // Success case.
-    if (rpc_status.ok() && server_status.ok()) {
-      if (data_->last_response_.has_last_primary_key()) {
-        data_->last_primary_key_ = data_->last_response_.last_primary_key();
+
+    while (true) {
+      bool allow_time_for_failover = data_->is_fault_tolerant_;
+      ScanRpcStatus result = data_->SendScanRpc(batch_deadline, allow_time_for_failover);
+
+      // Success case.
+      if (result.result == ScanRpcStatus::OK) {
+        if (data_->last_response_.has_last_primary_key()) {
+          data_->last_primary_key_ = data_->last_response_.last_primary_key();
+        }
+        data_->scan_attempts_ = 0;
+        return batch->data_->Reset(&data_->controller_,
+                                   data_->projection_,
+                                   &data_->client_projection_,
+                                   make_gscoped_ptr(data_->last_response_.release_data()));
       }
-      data_->scan_attempts_ = 0;
-      return result->data_->Reset(&data_->controller_,
-                                  data_->projection_,
-                                  &data_->client_projection_,
-                                  make_gscoped_ptr(data_->last_response_.release_data()));
-    }
 
-    data_->scan_attempts_++;
+      data_->scan_attempts_++;
 
-    // Error handling.
-    LOG(WARNING) << "Scan at tablet server " << data_->ts_->ToString() << " of tablet "
-        << ToString() << " failed: "
-        << (!rpc_status.ok() ? rpc_status.ToString() : server_status.ToString());
-    set<string> blacklist;
-    vector<internal::RemoteTabletServer*> candidates;
-    RETURN_NOT_OK(data_->CanBeRetried(false, rpc_status, server_status, rpc_deadline,
-                                      batch_deadline, candidates, &blacklist));
+      // Error handling.
+      LOG(WARNING) << "Scan at tablet server " << data_->ts_->ToString() << " of tablet "
+                   << ToString() << " failed: " << result.status.ToString();
+
+      set<string> blacklist;
+      RETURN_NOT_OK(data_->HandleError(result, batch_deadline, &blacklist));
+
+      if (data_->is_fault_tolerant_) {
+        LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere.";
+        return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
+      }
 
-    LOG(WARNING) << "Attempting to retry scan of tablet " << ToString() << " elsewhere.";
-    return data_->ReopenCurrentTablet(batch_deadline, &blacklist);
+      if (blacklist.empty()) {
+        // If we didn't blacklist the current server, we can just retry again.
+        continue;
+      }
+      // If we blacklisted the current server, and it's not fault-tolerant, we can't
+      // retry anywhere, so just propagate the error.
+      return result.status;
+    }
   } else if (data_->MoreTablets()) {
     // More data may be available in other tablets.
     // No need to close the current tablet; we scanned all the data so the

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fd556b2f/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 0863e47..1d24c13 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -71,14 +71,6 @@ KuduScanner::Data::Data(KuduTable* table)
 KuduScanner::Data::~Data() {
 }
 
-Status KuduScanner::Data::CheckForErrors() {
-  if (PREDICT_TRUE(!last_response_.has_error())) {
-    return Status::OK();
-  }
-
-  return StatusFromPB(last_response_.error().status());
-}
-
 namespace {
 void CopyPredicateBound(const ColumnSchema& col,
                         const void* bound_src,
@@ -132,28 +124,65 @@ void ColumnPredicateIntoPB(const ColumnPredicate& predicate,
 }
 } // anonymous namespace
 
-Status KuduScanner::Data::CanBeRetried(const bool isNewScan,
-                                       const Status& rpc_status, const Status& server_status,
-                                       const MonoTime& actual_deadline, const MonoTime& deadline,
-                                       const vector<RemoteTabletServer*>& candidates,
-                                       set<string>* blacklist) {
-  CHECK(!rpc_status.ok() || !server_status.ok());
+Status KuduScanner::Data::HandleError(const ScanRpcStatus& err,
+                                      const MonoTime& deadline,
+                                      set<string>* blacklist) {
+  // If we timed out because of the overall deadline, we're done.
+  // We didn't wait a full RPC timeout, though, so don't mark the tserver as failed.
+  if (err.result == ScanRpcStatus::OVERALL_DEADLINE_EXCEEDED) {
+      LOG(INFO) << "Scan of tablet " << remote_->tablet_id() << " at "
+          << ts_->ToString() << " deadline expired.";
+      return last_error_.ok()
+          ? err.status : err.status.CloneAndAppend(last_error_.ToString());
+  }
+
+  UpdateLastError(err.status);
 
-  // Check for ERROR_INVALID_REQUEST, which should not retry.
-  if (server_status.ok() &&
-      !rpc_status.ok() &&
-      controller_.error_response() != nullptr &&
-      controller_.error_response()->code() == rpc::ErrorStatusPB::ERROR_INVALID_REQUEST) {
-    return rpc_status;
+  bool mark_ts_failed = false;
+  bool blacklist_location = false;
+  bool mark_locations_stale = false;
+  bool can_retry = true;
+  bool backoff = false;
+  switch (err.result) {
+    case ScanRpcStatus::SERVER_BUSY:
+      backoff = true;
+      break;
+    case ScanRpcStatus::RPC_DEADLINE_EXCEEDED:
+    case ScanRpcStatus::RPC_ERROR:
+      blacklist_location = true;
+      mark_ts_failed = true;
+      break;
+    case ScanRpcStatus::SCANNER_EXPIRED:
+      break;
+    case ScanRpcStatus::TABLET_NOT_RUNNING:
+      blacklist_location = true;
+      break;
+    case ScanRpcStatus::TABLET_NOT_FOUND:
+      // There was either a tablet configuration change or the table was
+      // deleted, since at the time of this writing we don't support splits.
+      // Force a re-fetch of the tablet metadata.
+      mark_locations_stale = true;
+      blacklist_location = true;
+      break;
+    default:
+      can_retry = false;
+      break;
   }
 
-  // Check for ERROR_SERVER_TOO_BUSY, which should result in a retry after a delay.
-  if (server_status.ok() &&
-      !rpc_status.ok() &&
-      controller_.error_response() &&
-      controller_.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY) {
-    UpdateLastError(rpc_status);
+  if (mark_ts_failed) {
+    table_->client()->data_->meta_cache_->MarkTSFailed(ts_, err.status);
+    DCHECK(blacklist_location);
+  }
+
+  if (blacklist_location) {
+    blacklist->insert(ts_->permanent_uuid());
+  }
+
+  if (mark_locations_stale) {
+    remote_->MarkStale();
+  }
 
+  if (backoff) {
     // Exponential backoff with jitter anchored between 10ms and 20ms, and an
     // upper bound between 2.5s and 5s.
     MonoDelta sleep = MonoDelta::FromMilliseconds(
@@ -162,89 +191,69 @@ Status KuduScanner::Data::CanBeRetried(const bool isNewScan,
     now.AddDelta(sleep);
     if (deadline.ComesBefore(now)) {
       Status ret = Status::TimedOut("unable to retry before timeout",
-                                    rpc_status.ToString());
+                                    err.status.ToString());
       return last_error_.ok() ?
           ret : ret.CloneAndAppend(last_error_.ToString());
     }
-    LOG(INFO) << "Retrying scan to busy tablet server " << ts_->ToString()
-              << " after " << sleep.ToString() << "; attempt " << scan_attempts_;
+    LOG(INFO) << "Error scanning on server " << ts_->ToString() << ": "
+              << err.status.ToString() << ". Will retry after "
+              << sleep.ToString() << "; attempt " << scan_attempts_;
     SleepFor(sleep);
+  }
+  if (can_retry) {
     return Status::OK();
   }
+  return err.status;
+}
 
-  // Start by checking network errors.
-  if (!rpc_status.ok()) {
-    if (rpc_status.IsTimedOut() && actual_deadline.Equals(deadline)) {
-      // If we ended because of the overall deadline, we're done.
-      // We didn't wait a full RPC timeout though, so don't mark the tserver as failed.
-      LOG(INFO) << "Scan of tablet " << remote_->tablet_id() << " at "
-          << ts_->ToString() << " deadline expired.";
-      return last_error_.ok()
-          ? rpc_status : rpc_status.CloneAndAppend(last_error_.ToString());
-    } else {
-      // All other types of network errors are retriable, and also indicate the tserver is failed.
-      UpdateLastError(rpc_status);
-      table_->client()->data_->meta_cache_->MarkTSFailed(ts_, rpc_status);
-      blacklist->insert(ts_->permanent_uuid());
-    }
+ScanRpcStatus KuduScanner::Data::AnalyzeResponse(const Status& rpc_status,
+                                                 const MonoTime& overall_deadline,
+                                                 const MonoTime& deadline) {
+  if (rpc_status.ok() && !last_response_.has_error()) {
+    return ScanRpcStatus{ScanRpcStatus::OK, Status::OK()};
   }
 
-  // If we're in the middle of a batch and doing a non fault-tolerant scan, then
-  // we cannot retry. Non fault-tolerant scans can still be retried on a tablet
-  // boundary (i.e. an OpenTablet call).
-  if (!isNewScan && !is_fault_tolerant_) {
-    return !rpc_status.ok() ? rpc_status : server_status;
-  }
+  // Check for various RPC-level errors.
+  if (!rpc_status.ok()) {
+    // Handle various RPC-system level errors that came back from the server. These
+    // errors indicate that the TS is actually up.
+    if (rpc_status.IsRemoteError()) {
+      DCHECK(controller_.error_response());
+      switch (controller_.error_response()->code()) {
+        case rpc::ErrorStatusPB::ERROR_INVALID_REQUEST:
+          return ScanRpcStatus{ScanRpcStatus::INVALID_REQUEST, rpc_status};
+        case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY:
+          return ScanRpcStatus{ScanRpcStatus::SERVER_BUSY, rpc_status};
+        default:
+          return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
+      }
+    }
 
-  // For retries, the correct action depends on the particular failure condition.
-  //
-  // On an RPC error, we retry at a different tablet server.
-  //
-  // If the server returned an error code, it depends:
-  //
-  //   - SCANNER_EXPIRED    : The scan can be retried at the same tablet server.
-  //
-  //   - TABLET_NOT_RUNNING : The scan can be retried at a different tablet server, subject
-  //                          to the client's specified selection criteria.
-  //
-  //   - TABLET_NOT_FOUND   : The scan can be retried at a different tablet server, subject
-  //                          to the client's specified selection criteria.
-  //                          The metadata for this tablet should be refreshed.
-  //
-  //   - Any other error    : Fatal. This indicates an unexpected error while processing the scan
-  //                          request.
-  if (rpc_status.ok() && !server_status.ok()) {
-    UpdateLastError(server_status);
-
-    const tserver::TabletServerErrorPB& error = last_response_.error();
-    switch (error.code()) {
-      case tserver::TabletServerErrorPB::SCANNER_EXPIRED:
-        VLOG(1) << "Got SCANNER_EXPIRED error code, non-fatal error.";
-        break;
-      case tserver::TabletServerErrorPB::TABLET_NOT_RUNNING:
-        VLOG(1) << "Got error code " << tserver::TabletServerErrorPB::Code_Name(error.code())
-            << ": temporarily blacklisting node " << ts_->permanent_uuid();
-        blacklist->insert(ts_->permanent_uuid());
-        break;
-      case tserver::TabletServerErrorPB::TABLET_NOT_FOUND: {
-        // There was either a tablet configuration change or the table was
-        // deleted, since at the time of this writing we don't support splits.
-        // Backoff, then force a re-fetch of the tablet metadata.
-        remote_->MarkStale();
-        // TODO: Only backoff on the second time we hit TABLET_NOT_FOUND on the
-        // same tablet (see KUDU-1314).
-        MonoDelta backoff_time = MonoDelta::FromMilliseconds((random() % 1000) + 500);
-        SleepFor(backoff_time);
-        break;
+    if (rpc_status.IsTimedOut()) {
+      if (overall_deadline.Equals(deadline)) {
+        return ScanRpcStatus{ScanRpcStatus::OVERALL_DEADLINE_EXCEEDED, rpc_status};
+      } else {
+        return ScanRpcStatus{ScanRpcStatus::RPC_DEADLINE_EXCEEDED, rpc_status};
       }
-      default:
-        // All other server errors are fatal. Usually indicates a malformed request, e.g. a bad scan
-        // specification.
-        return server_status;
     }
+    return ScanRpcStatus{ScanRpcStatus::RPC_ERROR, rpc_status};
   }
 
-  return Status::OK();
+  // If we got this far, it indicates that the tserver service actually handled the
+  // call, but it was an error for some reason.
+  Status server_status = StatusFromPB(last_response_.error().status());
+  DCHECK(!server_status.ok());
+  const tserver::TabletServerErrorPB& error = last_response_.error();
+  switch (error.code()) {
+    case tserver::TabletServerErrorPB::SCANNER_EXPIRED:
+      return ScanRpcStatus{ScanRpcStatus::SCANNER_EXPIRED, server_status};
+    case tserver::TabletServerErrorPB::TABLET_NOT_RUNNING:
+      return ScanRpcStatus{ScanRpcStatus::TABLET_NOT_RUNNING, server_status};
+    case tserver::TabletServerErrorPB::TABLET_NOT_FOUND:
+      return ScanRpcStatus{ScanRpcStatus::TABLET_NOT_FOUND, server_status};
+    default:
+      return ScanRpcStatus{ScanRpcStatus::OTHER_TS_ERROR, server_status};
+  }
 }
 
 Status KuduScanner::Data::OpenNextTablet(const MonoTime& deadline,
@@ -261,6 +270,34 @@ Status KuduScanner::Data::ReopenCurrentTablet(const MonoTime& deadline,
                     blacklist);
 }
 
+ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
+                                             bool allow_time_for_failover) {
+  // The user has specified a timeout which should apply to the total time for each call
+  // to NextBatch(). However, for fault-tolerant scans, or for when we are first opening
+  // a scanner, it's preferable to set a shorter timeout (the "default RPC timeout") for
+  // each individual RPC call. This gives us time to fail over to a different server
+  // if the first server we try happens to be hung.
+  MonoTime rpc_deadline;
+  if (allow_time_for_failover) {
+    rpc_deadline = MonoTime::Now(MonoTime::FINE);
+    rpc_deadline.AddDelta(table_->client()->default_rpc_timeout());
+    rpc_deadline = MonoTime::Earliest(overall_deadline, rpc_deadline);
+  } else {
+    rpc_deadline = overall_deadline;
+  }
+
+  controller_.Reset();
+  controller_.set_deadline(rpc_deadline);
+  if (!spec_.predicates().empty()) {
+    controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
+  }
+  return AnalyzeResponse(
+      proxy_->Scan(next_req_,
+                   &last_response_,
+                   &controller_),
+      rpc_deadline, overall_deadline);
+}
+
 Status KuduScanner::Data::OpenTablet(const string& partition_key,
                                      const MonoTime& deadline,
                                      set<string>* blacklist) {
@@ -360,47 +397,19 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
       continue;
     }
     RETURN_NOT_OK(lookup_status);
-
-    MonoTime now = MonoTime::Now(MonoTime::FINE);
-    if (deadline.ComesBefore(now)) {
-      Status ret = Status::TimedOut("Scan timed out, deadline expired");
-      return last_error_.ok() ?
-          ret : ret.CloneAndAppend(last_error_.ToString());
-    }
-
-    // Recalculate the deadlines.
-    // If we have other replicas beyond this one to try, then we'll try to
-    // open the scanner with the default RPC timeout. That gives us time to
-    // try other replicas later. Otherwise, we open the scanner using the
-    // full remaining deadline for the user's call.
-    MonoTime rpc_deadline;
-    if (static_cast<int>(candidates.size()) - blacklist->size() > 1) {
-      rpc_deadline = now;
-      rpc_deadline.AddDelta(table_->client()->default_rpc_timeout());
-      rpc_deadline = MonoTime::Earliest(deadline, rpc_deadline);
-    } else {
-      rpc_deadline = deadline;
-    }
-
-    controller_.Reset();
-    controller_.set_deadline(rpc_deadline);
-
-    if (!spec_.predicates().empty()) {
-      controller_.RequireServerFeature(TabletServerFeatures::COLUMN_PREDICATES);
-    }
-
     CHECK(ts->proxy());
     ts_ = CHECK_NOTNULL(ts);
-    proxy_ = ts->proxy();
-    const Status rpc_status = proxy_->Scan(next_req_, &last_response_, &controller_);
-    const Status server_status = CheckForErrors();
-    if (rpc_status.ok() && server_status.ok()) {
+    proxy_ = ts_->proxy();
+
+    bool allow_time_for_failover = static_cast<int>(candidates.size()) - blacklist->size() > 1;
+    ScanRpcStatus scan_status = SendScanRpc(deadline, allow_time_for_failover);
+    if (scan_status.result == ScanRpcStatus::OK) {
+      last_error_ = Status::OK();
       scan_attempts_ = 0;
       break;
     }
     scan_attempts_++;
-    RETURN_NOT_OK(CanBeRetried(true, rpc_status, server_status, rpc_deadline, deadline,
-                               candidates, blacklist));
+    RETURN_NOT_OK(HandleError(scan_status, deadline, blacklist));
   }
 
   partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/fd556b2f/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index e2a3f63..44dea45 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -33,31 +33,80 @@ namespace kudu {
 
 namespace client {
 
+// The result of KuduScanner::Data::AnalyzeResponse.
+//
+// This provides a more specific enum for handling the possible error conditions in a Scan
+// RPC.
+struct ScanRpcStatus {
+  enum Result {
+    OK,
+
+    // The request was malformed (e.g. bad schema, etc).
+    INVALID_REQUEST,
+
+    // The server was busy (e.g. RPC queue overflow).
+    SERVER_BUSY,
+
+    // The deadline for the whole batch was exceeded.
+    OVERALL_DEADLINE_EXCEEDED,
+
+    // The deadline for an individual RPC was exceeded, but we have more time left to try
+    // on other hosts.
+    RPC_DEADLINE_EXCEEDED,
+
+    // Another RPC-system error (e.g. NetworkError because the TS was down).
+    RPC_ERROR,
+
+    // The scanner on the server side expired.
+    SCANNER_EXPIRED,
+
+    // The destination tablet was not running (e.g. in the process of bootstrapping).
+    TABLET_NOT_RUNNING,
+
+    // The destination tablet does not exist (e.g. because the replica was deleted).
+    TABLET_NOT_FOUND,
+
+    // Some other unknown tablet server error. This indicates that the TS was running
+    // but some problem occurred other than the ones enumerated above.
+    OTHER_TS_ERROR
+  };
+
+  Result result;
+  Status status;
+};
+
 class KuduScanner::Data {
  public:
+
   explicit Data(KuduTable* table);
   ~Data();
 
-  Status CheckForErrors();
-
   // Copies a predicate lower or upper bound from 'bound_src' into
   // 'bound_dst'.
   void CopyPredicateBound(const ColumnSchema& col,
                           const void* bound_src, std::string* bound_dst);
 
+
+  // Calculates a deadline and sends the next RPC for this scanner. The deadline for the
+  // RPC is calculated based on whether 'allow_time_for_failover' is true. If true,
+  // the deadline used for the RPC will be shortened so that, on timeout, there will
+  // be enough time for another attempt to a different server. If false, then the RPC
+  // will use 'overall_deadline' as its deadline.
+  //
+  // The RPC and TS proxy should already have been prepared in next_req_, proxy_, etc.
+  ScanRpcStatus SendScanRpc(const MonoTime& overall_deadline, bool allow_time_for_failover);
+
   // Called when KuduScanner::NextBatch or KuduScanner::Data::OpenTablet result in an RPC or
-  // server error. Returns the error status if the call cannot be retried.
+  // server error.
+  //
+  // If the provided 'status' indicates the error was retryable, then returns Status::OK()
+  // and potentially inserts the current server into 'blacklist' if the retry should be
+  // made on a different replica.
   //
-  // The number of parameters reflects the complexity of handling retries.
-  // We must respect the overall scan 'deadline', as well as the 'blacklist' of servers
-  // experiencing transient failures. See the implementation for more details.
-  Status CanBeRetried(const bool isNewScan,
-                      const Status& rpc_status,
-                      const Status& server_status,
-                      const MonoTime& actual_deadline,
-                      const MonoTime& deadline,
-                      const std::vector<internal::RemoteTabletServer*>& candidates,
-                      std::set<std::string>* blacklist);
+  // This function may also sleep in case the error suggests that backoff is necessary.
+  Status HandleError(const ScanRpcStatus& status,
+                     const MonoTime& deadline,
+                     std::set<std::string>* blacklist);
 
   // Open the next tablet in the scan.
   // The deadline is the time budget for this operation.
@@ -178,6 +227,23 @@ class KuduScanner::Data {
   // TODO: This and the overall scan retry logic duplicates much of RpcRetrier.
   Status last_error_;
 
+ private:
+  // Analyze the response of the last Scan RPC made by this scanner.
+  //
+  // The error handling of a scan RPC is fairly complex, since we have to handle
+  // some errors which happen at the network layer, some which happen generically
+  // at the RPC layer, and some which are scanner-specific. This function consolidates
+  // the various different error situations into a single enum code and Status.
+  //
+  // 'rpc_status':       the Status directly returned by the RPC Scan() method.
+  // 'overall_deadline': the user-provided deadline for the scanner batch
+  // 'rpc_deadline':     the deadline that was used for this specific RPC, which
+  //                     might be earlier than the overall deadline, in order to
+  //                     leave more time for further retries on other hosts.
+  ScanRpcStatus AnalyzeResponse(const Status& rpc_status,
+                                const MonoTime& overall_deadline,
+                                const MonoTime& rpc_deadline);
+
   DISALLOW_COPY_AND_ASSIGN(Data);
 };