You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/11/16 22:39:40 UTC

[1/2] kudu git commit: docs: clarify at-rest encryption known limitation

Repository: kudu
Updated Branches:
  refs/heads/master bb90de0ed -> 1e4db3148


docs: clarify at-rest encryption known limitation

Some users have been confused by the current documentation of at-rest
encryption not being supported. This just clarifies that device-level
encryption can be used to encrypt Kudu data.

Change-Id: I6d070716fe04572032c5516826593f7f6528671e
Reviewed-on: http://gerrit.cloudera.org:8080/8556
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/48bc4054
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/48bc4054
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/48bc4054

Branch: refs/heads/master
Commit: 48bc4054846f1809d7c8945c4222772beedf9cbb
Parents: bb90de0
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Nov 15 12:53:14 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Nov 15 22:30:36 2017 +0000

----------------------------------------------------------------------
 docs/known_issues.adoc | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/48bc4054/docs/known_issues.adoc
----------------------------------------------------------------------
diff --git a/docs/known_issues.adoc b/docs/known_issues.adoc
index 8577531..1702e8c 100644
--- a/docs/known_issues.adoc
+++ b/docs/known_issues.adoc
@@ -158,8 +158,9 @@
 * Authorization is only available at a system-wide, coarse-grained level. Table-level,
   column-level, and row-level authorization features are not available.
 
-* Data encryption at rest is not built in. Kudu has been reported to run correctly
-  on systems using local block device encryption (e.g. `dmcrypt`).
+* Data encryption at rest is not directly built into Kudu. Encryption of
+  Kudu data at rest can be achieved through the use of local block device
+  encryption software such as `dmcrypt`.
 
 * Kudu server Kerberos principals must follow the pattern `kudu/<HOST>@DEFAULT.REALM`.
   Configuring an alternate Kerberos principal is not supported.


[2/2] kudu git commit: KUDU-2048. consensus: only evict unresponsive nodes if remaining voters are viable

Posted by to...@apache.org.
KUDU-2048. consensus: only evict unresponsive nodes if remaining voters are viable

This adds a heuristic to the leader when it is deciding whether to evict
an unresponsive node. It will now only do so if the remaining nodes
appear viable (i.e they have recently had a successful communication
with the leader, are part of the most recent majority, etc).

Along the way, this cleans up various bits of the per-peer state
tracking and error handling code:

- removes the 'is_new', 'is_last_exchange_successful', and
  'needs_tablet_copy' booleans and replaces them with an enum tracking
  the status of the last exchange with each peer. This makes the code a
  little clearer since it resolves ambiguity about what a "successful"
  exchange is.

- renames 'last_successful_communication_time' to
  'last_communication_time' and clarifies the doc on this field. We
  actually update this time on any communication with the remote tablet
  server, even in cases where the remote replica is not in a good state
  (eg it needs a tablet copy or the tablet doesn't exist). We still
  update the field in this case, but the naming is now more
  representative.

- cleans up the code in Peer::ProcessResponse so that each response case
  is more clearly delineated. This clean-up exposed the fact that we
  have no test coverage of the 'CANNOT_PREPARE' case. Added a TODO about
  this.

Change-Id: I673f5b8a58b3954ea28066ecb334b3fdd60e7adb
Reviewed-on: http://gerrit.cloudera.org:8080/8245
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1e4db314
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1e4db314
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1e4db314

Branch: refs/heads/master
Commit: 1e4db3148a1cb4e340aa96edaea85c733cfdbf5a
Parents: 48bc405
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Oct 9 17:37:12 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Nov 16 22:30:13 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc           |  63 +++---
 src/kudu/consensus/consensus_queue-test.cc      |  10 +-
 src/kudu/consensus/consensus_queue.cc           | 202 ++++++++++++++-----
 src/kudu/consensus/consensus_queue.h            |  83 ++++++--
 .../tablet_replacement-itest.cc                 |  73 +++++++
 5 files changed, 320 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1e4db314/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 15b472e..9019e60 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -15,11 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/consensus_peers.h"
 
 #include <algorithm>
 #include <cstdlib>
+#include <memory>
 #include <mutex>
+#include <ostream>
 #include <string>
 #include <type_traits>
 #include <vector>
@@ -31,13 +33,19 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
-#include "kudu/consensus/consensus_peers.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/consensus_queue.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/move.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/periodic.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_controller.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
@@ -46,6 +54,7 @@
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
 
 DEFINE_int32(consensus_rpc_timeout_ms, 30000,
@@ -281,39 +290,41 @@ void Peer::ProcessResponse() {
 
   MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction);
 
+  // Process RpcController errors.
   if (!controller_.status().ok()) {
-    if (controller_.status().IsRemoteError()) {
-      // Most controller errors are caused by network issues or corner cases
-      // like shutdown and failure to serialize a protobuf. Therefore, we
-      // generally consider these errors to indicate an unreachable peer.
-      // However, a RemoteError wraps some other error propagated from the
-      // remote peer, so we know the remote is alive. Therefore, we will let
-      // the queue know that the remote is responsive.
-      queue_->NotifyPeerIsResponsive(peer_pb_.permanent_uuid());
-    }
+    auto ps = controller_.status().IsRemoteError() ?
+        PeerStatus::REMOTE_ERROR : PeerStatus::RPC_LAYER_ERROR;
+    queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), ps, controller_.status());
     ProcessResponseError(controller_.status());
     return;
   }
 
-  // Notify consensus that the peer has failed.
-  if (response_.has_error() && response_.error().code() == TabletServerErrorPB::TABLET_FAILED) {
+  // Process CANNOT_PREPARE.
+  // TODO(todd): there is no integration test coverage of this code path. Likely a bug in
+  // this path is responsible for KUDU-1779.
+  if (response_.status().has_error() &&
+      response_.status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE) {
     Status response_status = StatusFromPB(response_.error().status());
-    queue_->NotifyPeerHasFailed(peer_pb_.permanent_uuid(),
-                                response_status.ToString());
+    queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), PeerStatus::CANNOT_PREPARE,
+                             response_status);
     ProcessResponseError(response_status);
     return;
   }
 
-  // Pass through errors we can respond to, like not found, since in that case
-  // we will need to start a Tablet Copy. TODO: Handle DELETED response once implemented.
-  if ((response_.has_error() &&
-      response_.error().code() != TabletServerErrorPB::TABLET_NOT_FOUND) ||
-      (response_.status().has_error() &&
-          response_.status().error().code() == consensus::ConsensusErrorPB::CANNOT_PREPARE)) {
-    // Again, let the queue know that the remote is still responsive, since we
-    // will not be sending this error response through to the queue.
-    queue_->NotifyPeerIsResponsive(peer_pb_.permanent_uuid());
-    ProcessResponseError(StatusFromPB(response_.error().status()));
+  // Process tserver-level errors.
+  if (response_.has_error()) {
+    Status response_status = StatusFromPB(response_.error().status());
+    PeerStatus ps;
+    if (response_.error().code() == TabletServerErrorPB::TABLET_FAILED) {
+      ps = PeerStatus::TABLET_FAILED;
+    } else if (response_.error().code() == TabletServerErrorPB::TABLET_NOT_FOUND) {
+      ps = PeerStatus::TABLET_NOT_FOUND;
+    } else {
+      // Unknown kind of error.
+      ps = PeerStatus::REMOTE_ERROR;
+    }
+    queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), ps, response_status);
+    ProcessResponseError(response_status);
     return;
   }
 
@@ -388,7 +399,7 @@ void Peer::ProcessTabletCopyResponse() {
 
   if (success) {
     lock.unlock();
-    queue_->NotifyPeerIsResponsive(peer_pb_.permanent_uuid());
+    queue_->UpdatePeerStatus(peer_pb_.permanent_uuid(), PeerStatus::OK, Status::OK());
   } else if (!tc_response_.has_error() ||
               tc_response_.error().code() != TabletServerErrorPB::TabletServerErrorPB::THROTTLED) {
     // THROTTLED is a common response after a tserver with many replicas fails;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1e4db314/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 8722dde..62a2345 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -49,7 +49,6 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -830,13 +829,8 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
   ASSERT_FALSE(needs_tablet_copy);
 
   // Peer responds with tablet not found.
-  response.mutable_error()->set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND);
-  StatusToPB(Status::NotFound("No such tablet"), response.mutable_error()->mutable_status());
-  bool more_pending = false;
-  queue_->ResponseFromPeer(kPeerUuid, response, &more_pending);
-
-  // If the peer needs Tablet Copy, more_pending should be set to true.
-  ASSERT_TRUE(more_pending);
+  queue_->UpdatePeerStatus(kPeerUuid, PeerStatus::TABLET_NOT_FOUND,
+                           Status::NotFound("No such tablet"));
 
   // On the next request, we should find out that the queue wants us to initiate Tablet Copy.
   request.Clear();

http://git-wip-us.apache.org/repos/asf/kudu/blob/1e4db314/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 3bfeb2b..0bf75a7 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -43,7 +43,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -69,6 +68,7 @@ DEFINE_int32(consensus_inject_latency_ms_in_notifications, 0,
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, hidden);
 TAG_FLAG(consensus_inject_latency_ms_in_notifications, unsafe);
 
+DECLARE_int32(consensus_rpc_timeout_ms);
 DECLARE_bool(safe_time_advancement_without_writes);
 
 using kudu::log::Log;
@@ -94,14 +94,29 @@ METRIC_DEFINE_gauge_int64(tablet, ops_behind_leader, "Operations Behind Leader",
                           MetricUnit::kOperations,
                           "Number of operations this server believes it is behind the leader.");
 
+const char* PeerStatusToString(PeerStatus p) {
+  switch (p) {
+    case PeerStatus::OK: return "OK";
+    case PeerStatus::REMOTE_ERROR: return "REMOTE_ERROR";
+    case PeerStatus::RPC_LAYER_ERROR: return "RPC_LAYER_ERROR";
+    case PeerStatus::TABLET_FAILED: return "TABLET_FAILED";
+    case PeerStatus::TABLET_NOT_FOUND: return "TABLET_NOT_FOUND";
+    case PeerStatus::INVALID_TERM: return "INVALID_TERM";
+    case PeerStatus::LMP_MISMATCH: return "LMP_MISMATCH";
+    case PeerStatus::CANNOT_PREPARE: return "CANNOT_PREPARE";
+    case PeerStatus::NEW: return "NEW";
+  }
+  DCHECK(false);
+  return "<unknown>";
+}
+
 std::string PeerMessageQueue::TrackedPeer::ToString() const {
-  return Substitute("Peer: $0, Is new: $1, Last received: $2, Next index: $3, "
-                    "Last known committed idx: $4, Last exchange result: $5, "
-                    "Needs tablet copy: $6",
-                    uuid, is_new, OpIdToString(last_received), next_index,
-                    last_known_committed_index,
-                    is_last_exchange_successful ? "SUCCESS" : "ERROR",
-                    needs_tablet_copy);
+  return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, "
+                    "Last known committed idx: $4",
+                    uuid,
+                    PeerStatusToString(last_exchange_status),
+                    OpIdToString(last_received), next_index,
+                    last_known_committed_index);
 }
 
 #define INSTANTIATE_METRIC(x) \
@@ -174,7 +189,7 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
   // failure timeout.
   MonoTime now(MonoTime::Now());
   for (const PeersMap::value_type& entry : peers_map_) {
-    entry.second->last_successful_communication_time = now;
+    entry.second->last_communication_time = now;
   }
   time_manager_->SetLeaderMode();
 }
@@ -354,6 +369,65 @@ OpId PeerMessageQueue::GetNextOpId() const {
                   queue_state_.last_appended.index() + 1);
 }
 
+bool PeerMessageQueue::SafeToEvict(const string& evict_uuid) {
+  auto now = MonoTime::Now();
+
+  std::lock_guard<simple_spinlock> lock(queue_lock_);
+  int remaining_voters = 0;
+  int remaining_viable_voters = 0;
+
+  for (const auto& e : peers_map_) {
+    const auto& uuid = e.first;
+    const auto& peer = e.second;
+    if (!IsRaftConfigVoter(uuid, *queue_state_.active_config)) {
+      continue;
+    }
+    if (uuid == evict_uuid) {
+      continue;
+    }
+    remaining_voters++;
+
+    bool viable = true;
+
+    // Only consider a peer to be a viable voter if...
+    // ...its last exchange was successful
+    viable &= peer->last_exchange_status == PeerStatus::OK;
+
+    // ...the peer is up to date with the latest majority.
+    //
+    //    This indicates that it's actively participating in majorities and likely to
+    //    replicate a config change immediately when we propose it.
+    viable &= peer->last_received.index() >= queue_state_.majority_replicated_index;
+
+    // ...we have communicated successfully with it recently.
+    //
+    //    This handles the case where the tablet has had no recent writes and therefore
+    //    even a replica that is down would have participated in the latest majority.
+    auto unreachable_time = now - peer->last_communication_time;
+    viable &= unreachable_time.ToMilliseconds() < FLAGS_consensus_rpc_timeout_ms;
+
+    if (viable) {
+      remaining_viable_voters++;
+    }
+  }
+
+  // We never drop from 2 to 1 automatically, at least for now. We may want
+  // to revisit this later, we're just being cautious with this.
+  if (remaining_voters <= 1) {
+    VLOG(2) << LogPrefixUnlocked() << "Not evicting P $0 (only one voter would remain)";
+    return false;
+  }
+  // If the remaining number of viable voters is not enough to form a majority
+  // of the remaining voters, don't evict anything.
+  if (remaining_viable_voters < MajoritySize(remaining_voters)) {
+    VLOG(2) << LogPrefixUnlocked() << Substitute(
+        "Not evicting P $0 (only $1/$2 remaining voters appear viable)",
+        evict_uuid, remaining_viable_voters, remaining_voters);
+    return false;
+  }
+
+  return true;
+}
 
 Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         ConsensusRequestPB* request,
@@ -361,7 +435,6 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
                                         bool* needs_tablet_copy) {
   // Maintain a thread-safe copy of necessary members.
   OpId preceding_id;
-  int num_voters;
   int64_t current_term;
   TrackedPeer peer;
   MonoDelta unreachable_time;
@@ -382,19 +455,16 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
     // This is initialized to the queue's last appended op but gets set to the id of the
     // log entry preceding the first one in 'messages' if messages are found for the peer.
     preceding_id = queue_state_.last_appended;
-    num_voters = CountVoters(*queue_state_.active_config);
     current_term = queue_state_.current_term;
 
     request->set_committed_index(queue_state_.committed_index);
     request->set_all_replicated_index(queue_state_.all_replicated_index);
     request->set_last_idx_appended_to_leader(queue_state_.last_appended.index());
     request->set_caller_term(current_term);
-    unreachable_time = MonoTime::Now() - peer.last_successful_communication_time;
+    unreachable_time = MonoTime::Now() - peer.last_communication_time;
   }
   if (unreachable_time.ToSeconds() > FLAGS_follower_unavailable_considered_failed_sec) {
-    if (num_voters > 2) {
-      // We never drop from 2 to 1 automatically, at least for now. We may want
-      // to revisit this later, we're just being cautious with this.
+    if (SafeToEvict(uuid)) {
       string msg = Substitute("Leader has been unable to successfully communicate "
                               "with Peer $0 for more than $1 seconds ($2)",
                               uuid,
@@ -403,7 +473,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
       NotifyObserversOfFailedFollower(uuid, current_term, msg);
     }
   }
-  if (PREDICT_FALSE(peer.needs_tablet_copy)) {
+  if (peer.last_exchange_status == PeerStatus::TABLET_NOT_FOUND) {
     VLOG(3) << LogPrefixUnlocked() << "Peer " << uuid << " needs tablet copy" << THROTTLE_MSG;
     *needs_tablet_copy = true;
     return Status::OK();
@@ -413,7 +483,7 @@ Status PeerMessageQueue::RequestForPeer(const string& uuid,
   // If we've never communicated with the peer, we don't know what messages to
   // send, so we'll send a status-only request. Otherwise, we grab requests
   // from the log starting at the last_received point.
-  if (!peer.is_new) {
+  if (peer.last_exchange_status != PeerStatus::NEW) {
 
     // The batch of messages to send to the peer.
     vector<ReplicateRefPtr> messages;
@@ -511,10 +581,9 @@ Status PeerMessageQueue::GetTabletCopyRequestForPeer(const string& uuid,
     if (PREDICT_FALSE(peer == nullptr || queue_state_.mode == NON_LEADER)) {
       return Status::NotFound("Peer not tracked or queue not in leader mode.");
     }
-    if (PREDICT_FALSE(!peer->needs_tablet_copy)) {
+    if (PREDICT_FALSE(peer->last_exchange_status != PeerStatus::TABLET_NOT_FOUND)) {
       return Status::IllegalState("Peer does not need to initiate Tablet Copy", uuid);
     }
-    peer->needs_tablet_copy = false;
   }
   req->Clear();
   req->set_dest_uuid(uuid);
@@ -555,7 +624,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     // - remote peer A has replicated opid 100
     // - remote peer B has replication opid 10 and is catching up
     // - remote peer A goes down
-    // Here we'd start getting 'is_last_exchange_successful == false' for peer A.
+    // Here we'd start getting a non-OK last_exchange_status for peer A.
     // In that case, the 'all_replicated_watermark', which requires 3 peers, would not
     // be updateable, even once we've replicated peer 'B' up to opid 100. It would
     // get "stuck" at 10. In fact, in this case, the 'majority_replicated_watermark' would
@@ -568,7 +637,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
     // was an error (LMP mismatch, for example), the 'last_received' is _not_ usable
     // for watermark calculation. This could be fixed by separately storing the
     // 'match_index' on a per-peer basis and using that for watermark calculation.
-    if (peer.second->is_last_exchange_successful) {
+    if (peer.second->last_exchange_status == PeerStatus::OK) {
       watermarks.push_back(peer.second->last_received.index());
     }
   }
@@ -616,22 +685,59 @@ void PeerMessageQueue::UpdateLastIndexAppendedToLeader(int64_t last_idx_appended
   UpdateLagMetricsUnlocked();
 }
 
-void PeerMessageQueue::NotifyPeerIsResponsive(const string& peer_uuid) {
-  std::lock_guard<simple_spinlock> l(queue_lock_);
+void PeerMessageQueue::UpdatePeerStatus(const string& peer_uuid,
+                                        PeerStatus ps,
+                                        const Status& status) {
+  std::unique_lock<simple_spinlock> l(queue_lock_);
   TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
   if (!peer) return;
-  peer->last_successful_communication_time = MonoTime::Now();
-}
+  peer->last_exchange_status = ps;
 
-void PeerMessageQueue::NotifyPeerHasFailed(const string& peer_uuid, const string& reason) {
-  std::unique_lock<simple_spinlock> l(queue_lock_);
-  TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
-  if (peer) {
-    // Use the current term to ensure the peer will be evicted, otherwise this
-    // notification may be ignored.
-    int64_t current_term = queue_state_.current_term;
-    l.unlock();
-    NotifyObserversOfFailedFollower(peer_uuid, current_term, reason);
+  if (ps != PeerStatus::RPC_LAYER_ERROR) {
+    // So long as we got _any_ response from the follower, we consider it a 'communication'.
+    // RPC_LAYER_ERROR indicates something like a connection failure, indicating that the
+    // host itself is likely down.
+    //
+    // This indicates that the node is at least online.
+    peer->last_communication_time = MonoTime::Now();
+  }
+
+  switch (ps) {
+    case PeerStatus::NEW:
+      LOG_WITH_PREFIX_UNLOCKED(DFATAL) << "Should not update an existing peer to 'NEW' state";
+      break;
+
+    case PeerStatus::RPC_LAYER_ERROR:
+      // Most controller errors are caused by network issues or corner cases
+      // like shutdown and failure to deserialize a protobuf. Therefore, we
+      // generally consider these errors to indicate an unreachable peer.
+      DCHECK(!status.ok());
+      break;
+
+    case PeerStatus::TABLET_NOT_FOUND:
+      VLOG_WITH_PREFIX_UNLOCKED(1) << "Peer needs tablet copy: " << peer->ToString();
+      break;
+
+    case PeerStatus::TABLET_FAILED: {
+      // Use the current term to ensure the peer will be evicted, otherwise this
+      // notification may be ignored.
+      int64_t current_term = queue_state_.current_term;
+      l.unlock();
+      NotifyObserversOfFailedFollower(peer_uuid, current_term, status.ToString());
+      return;
+    }
+
+    case PeerStatus::REMOTE_ERROR:
+    case PeerStatus::INVALID_TERM:
+    case PeerStatus::LMP_MISMATCH:
+    case PeerStatus::CANNOT_PREPARE:
+      // No special handling here for these - we assume that we'll just retry until
+      // we make progress.
+      break;
+
+    case PeerStatus::OK:
+      DCHECK(status.ok());
+      break;
   }
 }
 
@@ -640,6 +746,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
                                         bool* more_pending) {
   DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
       << response.InitializationErrorString() << ". Response: " << SecureShortDebugString(response);
+  CHECK(!response.has_error());
 
   boost::optional<int64_t> updated_commit_index;
   Mode mode_copy;
@@ -654,20 +761,6 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       return;
     }
 
-    // Initiate Tablet Copy on the peer if the tablet is not found or deleted.
-    if (response.has_error()) {
-      // We only let special types of errors through to this point from the peer.
-      CHECK_EQ(tserver::TabletServerErrorPB::TABLET_NOT_FOUND, response.error().code())
-          << SecureShortDebugString(response);
-
-      peer->needs_tablet_copy = true;
-      VLOG_WITH_PREFIX_UNLOCKED(1) << "Marked peer as needing tablet copy: "
-                                   << peer->ToString();
-
-      *more_pending = true;
-      return;
-    }
-
     // Sanity checks.
     // Some of these can be eventually removed, but they are handy for now.
     DCHECK(response.status().IsInitialized())
@@ -692,9 +785,9 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     TrackedPeer previous = *peer;
 
     // Update the peer status based on the response.
-    peer->is_new = false;
+    peer->last_exchange_status = PeerStatus::OK;
     peer->last_known_committed_index = status.last_committed_idx();
-    peer->last_successful_communication_time = MonoTime::Now();
+    peer->last_communication_time = MonoTime::Now();
 
     // If the reported last-received op for the replica is in our local log,
     // then resume sending entries from that point onward. Otherwise, resume
@@ -732,12 +825,11 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     }
 
     if (PREDICT_FALSE(status.has_error())) {
-      peer->is_last_exchange_successful = false;
       switch (status.error().code()) {
         case ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH: {
+          peer->last_exchange_status = PeerStatus::LMP_MISMATCH;
           DCHECK(status.has_last_received());
-          if (previous.is_new) {
-            // That's currently how we can detect that we able to connect to a peer.
+          if (previous.last_exchange_status == PeerStatus::NEW) {
             LOG_WITH_PREFIX_UNLOCKED(INFO) << "Connected to new peer: " << peer->ToString();
           } else {
             LOG_WITH_PREFIX_UNLOCKED(INFO) << "Got LMP mismatch error from peer: "
@@ -747,6 +839,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           return;
         }
         case ConsensusErrorPB::INVALID_TERM: {
+          peer->last_exchange_status = PeerStatus::INVALID_TERM;
           CHECK(response.has_responder_term());
           LOG_WITH_PREFIX_UNLOCKED(INFO) << "Peer responded invalid term: " << peer->ToString();
           NotifyObserversOfTermChange(response.responder_term());
@@ -754,6 +847,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           return;
         }
         default: {
+          peer->last_exchange_status = PeerStatus::REMOTE_ERROR;
           LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected consensus error. Code: "
               << ConsensusErrorPB::Code_Name(status.error().code()) << ". Response: "
               << SecureShortDebugString(response);
@@ -761,8 +855,6 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       }
     }
 
-    peer->is_last_exchange_successful = true;
-
     if (response.has_responder_term()) {
       // The peer must have responded with a term that is greater than or equal to
       // the last known term for that peer.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1e4db314/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index d977eb3..c3cf620 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -62,6 +62,44 @@ class StartTabletCopyRequestPB;
 // The id for the server-wide consensus queue MemTracker.
 extern const char kConsensusQueueParentTrackerId[];
 
+// State enum for the last known status of a peer tracked by the
+// ConsensusQueue.
+enum class PeerStatus {
+  // The peer has not yet had a round of communication.
+  NEW,
+
+  // The last exchange with the peer was successful. We transmitted
+  // an update to the peer and it accepted it.
+  OK,
+
+  // Some tserver-level or consensus-level error occurred that didn't
+  // fall into any of the below buckets.
+  REMOTE_ERROR,
+
+  // Some RPC-layer level error occurred. For example, a network error or timeout
+  // occurred while attempting to send the RPC.
+  RPC_LAYER_ERROR,
+
+  // The remote tablet server indicated that the tablet was in a FAILED state.
+  TABLET_FAILED,
+
+  // The remote tablet server indicated that the tablet was in a NOT_FOUND state.
+  TABLET_NOT_FOUND,
+
+  // The remote tablet server indicated that the term of this leader was older
+  // than its latest seen term.
+  INVALID_TERM,
+
+  // The remote tablet server was unable to prepare any operations in the most recent
+  // batch.
+  CANNOT_PREPARE,
+
+  // The remote tablet server's log was divergent from the leader's log.
+  LMP_MISMATCH,
+};
+
+const char* PeerStatusToString(PeerStatus p);
+
 // Tracks the state of the peers and which transactions they have replicated.
 // Owns the LogCache which actually holds the replicate messages which are
 // en route to the various peers.
@@ -77,13 +115,11 @@ class PeerMessageQueue {
   struct TrackedPeer {
     explicit TrackedPeer(std::string uuid)
         : uuid(std::move(uuid)),
-          is_new(true),
           next_index(kInvalidOpIdIndex),
           last_received(MinimumOpId()),
           last_known_committed_index(MinimumOpId().index()),
-          is_last_exchange_successful(false),
-          last_successful_communication_time(MonoTime::Now()),
-          needs_tablet_copy(false),
+          last_exchange_status(PeerStatus::NEW),
+          last_communication_time(MonoTime::Now()),
           last_seen_term_(0) {}
 
     TrackedPeer() = default;
@@ -103,9 +139,6 @@ class PeerMessageQueue {
     // UUID of the peer.
     std::string uuid;
 
-    // Whether this is a newly tracked peer.
-    bool is_new;
-
     // Next index to send to the peer.
     // This corresponds to "nextIndex" as specified in Raft.
     int64_t next_index;
@@ -117,16 +150,20 @@ class PeerMessageQueue {
     // The last committed index this peer knows about.
     int64_t last_known_committed_index;
 
-    // Whether the last exchange with this peer was successful.
-    bool is_last_exchange_successful;
+    // The status after our last attempt to communicate with the peer.
+    // See the comments within the PeerStatus enum above for details.
+    PeerStatus last_exchange_status;
 
     // The time of the last communication with the peer.
+    //
+    // NOTE: this does not indicate that the peer successfully made progress at the
+    // given time -- this only indicates that we got some indication that the tablet
+    // server process was alive. It could be that the tablet was not found, etc.
+    // Consult last_exchange_status for details.
+    //
     // Defaults to the time of construction, so does not necessarily mean that
     // successful communication ever took place.
-    MonoTime last_successful_communication_time;
-
-    // Whether the follower was detected to need tablet copy.
-    bool needs_tablet_copy;
+    MonoTime last_communication_time;
 
     // Throttler for how often we will log status messages pertaining to this
     // peer (eg when it is lagging, etc).
@@ -232,16 +269,14 @@ class PeerMessageQueue {
   Status GetTabletCopyRequestForPeer(const std::string& uuid,
                                      StartTabletCopyRequestPB* req);
 
-  // Update the last successful communication timestamp for the given peer to
-  // the current time.
-  //
-  // This should be called when the peer responds with a message indicating that
-  // it is alive and making progress.
-  void NotifyPeerIsResponsive(const std::string& peer_uuid);
+  // Inform the queue of a new status known for one of its peers.
+  // 'ps' indicates an interpretation of the status, while 'status'
+  // may contain a more specific error message in the case of one of
+  // the error statuses.
+  void UpdatePeerStatus(const std::string& peer_uuid,
+                        PeerStatus ps,
+                        const Status& status);
 
-  // Notify consensus that the given peer has failed.
-  void NotifyPeerHasFailed(const std::string& peer_uuid,
-                           const std::string& reason);
 
   // Updates the request queue with the latest response of a peer, returns
   // whether this peer has more requests pending.
@@ -382,6 +417,10 @@ class PeerMessageQueue {
   // fatal error.
   bool IsOpInLog(const OpId& desired_op) const;
 
+  // Return true if it would be safe to evict the peer 'evict_uuid' at this
+  // point in time.
+  bool SafeToEvict(const std::string& evict_uuid);
+
   void NotifyObserversOfCommitIndexChange(int64_t new_commit_index);
   void NotifyObserversOfCommitIndexChangeTask(int64_t new_commit_index);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1e4db314/src/kudu/integration-tests/tablet_replacement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_replacement-itest.cc b/src/kudu/integration-tests/tablet_replacement-itest.cc
index 29ead3c..c012f02 100644
--- a/src/kudu/integration-tests/tablet_replacement-itest.cc
+++ b/src/kudu/integration-tests/tablet_replacement-itest.cc
@@ -65,7 +65,14 @@ using strings::Substitute;
 
 namespace kudu {
 
+enum InstabilityType {
+  NODE_DOWN,
+  NODE_STOPPED
+};
+
 class TabletReplacementITest : public ExternalMiniClusterITestBase {
+ protected:
+  void TestDontEvictIfRemainingConfigIsUnstable(InstabilityType type);
 };
 
 // Test that the Master will tombstone a newly-evicted replica.
@@ -317,6 +324,72 @@ TEST_F(TabletReplacementITest, TestEvictAndReplaceDeadFollower) {
   ASSERT_OK(cluster_->tablet_server(kFollowerIndex)->Restart());
 }
 
+void TabletReplacementITest::TestDontEvictIfRemainingConfigIsUnstable(InstabilityType type) {
+  if (!AllowSlowTests()) {
+    LOG(INFO) << "Skipping test in fast-test mode.";
+    return;
+  }
+
+  MonoDelta timeout = MonoDelta::FromSeconds(30);
+  vector<string> ts_flags = { "--enable_leader_failure_detection=false",
+                              "--follower_unavailable_considered_failed_sec=5" };
+  vector<string> master_flags = { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false" };
+  NO_FATALS(StartCluster(ts_flags, master_flags));
+
+  TestWorkload workload(cluster_.get());
+  workload.Setup(); // Easy way to create a new tablet.
+
+  const int kLeaderIndex = 0;
+  TServerDetails* leader_ts = ts_map_[cluster_->tablet_server(kLeaderIndex)->uuid()];
+  const int kFollower1Index = 1;
+  const int kFollower2Index = 2;
+
+  // Figure out the tablet id of the created tablet.
+  vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+  ASSERT_OK(itest::WaitForNumTabletsOnTS(leader_ts, 1, timeout, &tablets));
+  string tablet_id = tablets[0].tablet_status().tablet_id();
+
+  // Wait until all replicas are up and running.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id, timeout));
+  }
+
+  // Elect a leader (TS 0)
+  ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
+  ASSERT_OK(itest::WaitForServersToAgree(timeout, ts_map_, tablet_id, 1)); // Wait for NO_OP.
+
+  // Shut down both followers and wait for enough time that the leader thinks they are
+  // unresponsive. It should not trigger a config change to evict either one.
+  switch (type) {
+    case NODE_DOWN:
+      cluster_->tablet_server(kFollower1Index)->Shutdown();
+      cluster_->tablet_server(kFollower2Index)->Shutdown();
+      break;
+    case NODE_STOPPED:
+      cluster_->tablet_server(kFollower1Index)->Pause();
+      cluster_->tablet_server(kFollower2Index)->Pause();
+      break;
+  }
+
+  SleepFor(MonoDelta::FromSeconds(10));
+  consensus::ConsensusStatePB cstate;
+  ASSERT_OK(GetConsensusState(leader_ts, tablet_id, MonoDelta::FromSeconds(10), &cstate));
+  SCOPED_TRACE(cstate.DebugString());
+  ASSERT_FALSE(cstate.has_pending_config())
+      << "Leader should not have issued any config change";
+}
+
+// Regression test for KUDU-2048. If a majority of followers are unresponsive, the
+// leader should not evict any of them.
+TEST_F(TabletReplacementITest, TestDontEvictIfRemainingConfigIsUnstable_NodesDown) {
+  TestDontEvictIfRemainingConfigIsUnstable(NODE_DOWN);
+}
+
+TEST_F(TabletReplacementITest, TestDontEvictIfRemainingConfigIsUnstable_NodesStopped) {
+  TestDontEvictIfRemainingConfigIsUnstable(NODE_STOPPED);
+}
+
 // Regression test for KUDU-1233. This test creates a situation in which tablet
 // bootstrap will attempt to replay committed (and applied) config change
 // operations. This is achieved by delaying application of a write at the