You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/06/23 02:07:27 UTC

[5/5] incubator-kudu git commit: KUDU-1469. Fix handling of fully-deduped requests after a leader change

KUDU-1469. Fix handling of fully-deduped requests after a leader change

This fixes KUDU-1469, a bug in which the leader and follower could get
into a tight loop of RPCs making no progress replicating operations.

The newly included test fails without the bug fix, and passes with it.
See its comments for details on the bug itself.

Change-Id: Iced21ae1b69c1079efc9aa9cf23e2fa592b8bebd
Reviewed-on: http://gerrit.cloudera.org:8080/3228
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
(cherry picked from commit d685747426472d428b1d071df00d112d9f775117)
Reviewed-on: http://gerrit.cloudera.org:8080/3455
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/3cbd76d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/3cbd76d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/3cbd76d2

Branch: refs/heads/branch-0.9.x
Commit: 3cbd76d29ba531293c02af461de313d4263db1eb
Parents: 6542265
Author: Todd Lipcon <to...@apache.org>
Authored: Sun Jun 5 23:14:09 2016 +0100
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jun 23 02:07:07 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus.proto              |   6 +
 src/kudu/consensus/consensus_queue.cc           |   5 +
 src/kudu/consensus/log_cache.cc                 |   6 +-
 src/kudu/consensus/raft_consensus-test.cc       |   8 +-
 src/kudu/consensus/raft_consensus.cc            |  12 +-
 src/kudu/consensus/raft_consensus_state.cc      |  13 +-
 src/kudu/consensus/raft_consensus_state.h       |   5 +-
 .../integration-tests/raft_consensus-itest.cc   | 127 +++++++++++++++++++
 8 files changed, 158 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 517d06b..3439d5b 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -229,6 +229,12 @@ message ConsensusStatusPB {
   // (PRECEDING_ENTRY_DIDNT_MATCH), this field is important and may still be
   // set, since the leader queue uses this field in conjuction with
   // last_received to decide on the next id to send to the follower.
+  //
+  // NOTE: it might seem that the leader itself could track this based on knowing
+  // which batches were successfully sent. However, the follower is free to
+  // truncate the batch if an operation in the middle of the batch fails
+  // to prepare (eg due to memory limiting). In that case, the leader
+  // will get a success response but still need to re-send some operations.
   optional OpId last_received_current_leader = 4;
 
   // The last committed index that is known to the peer.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 040a5c2..b170018 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -560,6 +560,11 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // the hope that doing so will result in a faster catch-up process.
       DCHECK_GE(peer->last_known_committed_idx, 0);
       peer->next_index = peer->last_known_committed_idx + 1;
+      LOG_WITH_PREFIX_UNLOCKED(INFO)
+          << "Peer " << peer_uuid << " log is divergent from this leader: "
+          << "its last log entry " << OpIdToString(status.last_received()) << " is not in "
+          << "this leader's log and it has not received anything from this leader yet. "
+          << "Falling back to committed index " << peer->last_known_committed_idx;
     }
 
     if (PREDICT_FALSE(status.has_error())) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/log_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_cache.cc b/src/kudu/consensus/log_cache.cc
index 605eae6..ae82a47 100644
--- a/src/kudu/consensus/log_cache.cc
+++ b/src/kudu/consensus/log_cache.cc
@@ -303,8 +303,10 @@ Status LogCache::ReadOps(int64_t after_op_index,
           next_index, up_to, remaining_space, &raw_replicate_ptrs),
         Substitute("Failed to read ops $0..$1", next_index, up_to));
       l.lock();
-      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Successfully read " << raw_replicate_ptrs.size() << " ops "
-                            << "from disk.";
+      LOG_WITH_PREFIX_UNLOCKED(INFO)
+          << "Successfully read " << raw_replicate_ptrs.size() << " ops "
+          << "from disk (" << next_index << ".."
+          << (next_index + raw_replicate_ptrs.size() - 1) << ")";
 
       for (ReplicateMsg* msg : raw_replicate_ptrs) {
         CHECK_EQ(next_index, msg->id().index());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/raft_consensus-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus-test.cc b/src/kudu/consensus/raft_consensus-test.cc
index e874d5d..52dfa97 100644
--- a/src/kudu/consensus/raft_consensus-test.cc
+++ b/src/kudu/consensus/raft_consensus-test.cc
@@ -685,8 +685,10 @@ TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
   ASSERT_OPID_EQ(response.status().last_received_current_leader(),  noop_opid);
 
   // New leader heartbeat. Term increase to 2.
-  // Expect current term replicated to be nothing (MinimumOpId) but log
-  // replicated to be everything sent so far.
+  // The preceding_opid is the no-op replicated above. This will match on the
+  // follower side, so it can update its last_received_current_leader to
+  // the same operation (indicating to the queue that it doesn't need to re-replicate
+  // this operation).
   caller_term = 2;
   caller_uuid = config_.peers(1).permanent_uuid();
   preceding_opid = noop_opid;
@@ -696,7 +698,7 @@ TEST_F(RaftConsensusTest, TestResetRcvdFromCurrentLeaderOnNewTerm) {
   ASSERT_FALSE(response.status().has_error()) << response.ShortDebugString();
   ASSERT_EQ(caller_term, response.responder_term());
   ASSERT_OPID_EQ(response.status().last_received(), preceding_opid);
-  ASSERT_OPID_EQ(response.status().last_received_current_leader(), MinimumOpId());
+  ASSERT_OPID_EQ(response.status().last_received_current_leader(), preceding_opid);
 
   // Append a no-op.
   noop_opid = MakeOpId(caller_term, ++log_index);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index b23087a..d990a0e 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1188,17 +1188,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // If any messages failed to be started locally, then we already have removed them
     // from 'deduped_req' at this point. So, we can simply update our last-received
     // watermark to the last message that remains in 'deduped_req'.
-    //
-    // It's possible that the leader didn't send us any new data -- it might be a completely
-    // duplicate request. In that case, we don't need to update LastReceived at all.
-    if (!deduped_req.messages.empty()) {
-      OpId last_appended = deduped_req.messages.back()->get()->id();
-      TRACE(Substitute("Updating last received op as $0", last_appended.ShortDebugString()));
-      state_->UpdateLastReceivedOpIdUnlocked(last_appended);
-    } else {
-      DCHECK_GE(state_->GetLastReceivedOpIdUnlocked().index(),
-                deduped_req.preceding_opid->index());
-    }
+    state_->UpdateLastReceivedOpIdUnlocked(last_from_leader);
 
     // Fill the response with the current state. We will not mutate anymore state until
     // we actually reply to the leader, we'll just wait for the messages to be durable.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index 4a70efb..f142090 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -645,13 +645,12 @@ Status ReplicaState::CheckHasCommittedOpInCurrentTermUnlocked() const {
 
 void ReplicaState::UpdateLastReceivedOpIdUnlocked(const OpId& op_id) {
   DCHECK(update_lock_.is_locked());
-  DCHECK_LE(OpIdCompare(last_received_op_id_, op_id), 0)
-    << "Previously received OpId: " << last_received_op_id_.ShortDebugString()
-    << ", updated OpId: " << op_id.ShortDebugString()
-    << ", Trace:" << std::endl << Trace::CurrentTrace()->DumpToString();
-  last_received_op_id_ = op_id;
-  last_received_op_id_current_leader_ = last_received_op_id_;
-  next_index_ = op_id.index() + 1;
+  if (OpIdCompare(op_id, last_received_op_id_) > 0) {
+    TRACE("Updating last received op as $0", OpIdToString(op_id));
+    last_received_op_id_ = op_id;
+    next_index_ = op_id.index() + 1;
+  }
+  last_received_op_id_current_leader_ = op_id;
 }
 
 const OpId& ReplicaState::GetLastReceivedOpIdUnlocked() const {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/consensus/raft_consensus_state.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.h b/src/kudu/consensus/raft_consensus_state.h
index a849cfc..3485bfb 100644
--- a/src/kudu/consensus/raft_consensus_state.h
+++ b/src/kudu/consensus/raft_consensus_state.h
@@ -270,7 +270,10 @@ class ReplicaState {
   // Returns OK iff an op from the current term has been committed.
   Status CheckHasCommittedOpInCurrentTermUnlocked() const;
 
-  // Updates the last received operation.
+  // Updates the last received operation, if 'op_id''s index is higher than
+  // the previous last received. Also updates 'last_received_from_current_leader_'
+  // regardless of whether it is higher or lower than the prior value.
+  //
   // This must be called under a lock.
   void UpdateLastReceivedOpIdUnlocked(const OpId& op_id);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cbd76d2/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 62f8f82..8c00cf9 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -198,6 +198,11 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
     return ret;
   }
 
+  // Insert 'num_rows' rows starting with row key 'start_row'.
+  // Each row will have size 'payload_size'. A short (100ms) timeout is
+  // used. If the flush generates any errors they will be ignored.
+  void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size);
+
   void InsertTestRowsRemoteThread(uint64_t first_row,
                                   uint64_t count,
                                   uint64_t num_batches,
@@ -2291,6 +2296,128 @@ TEST_F(RaftConsensusITest, TestSlowLeader) {
   SleepFor(MonoDelta::FromSeconds(60));
 }
 
+void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size) {
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable(kTableId, &table));
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(100);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  string payload(payload_size, 'x');
+  for (int i = 0; i < num_rows; i++) {
+    gscoped_ptr<KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    CHECK_OK(row->SetInt32(0, i + start_row));
+    CHECK_OK(row->SetInt32(1, 0));
+    CHECK_OK(row->SetStringCopy(2, payload));
+    CHECK_OK(session->Apply(insert.release()));
+    ignore_result(session->Flush());
+  }
+}
+
+// Regression test for KUDU-1469, a case in which a leader and follower could get "stuck"
+// in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the
+// follower already had, the follower would fully de-dupe them, and yet the leader would
+// never advance to the next batch.
+//
+// The 'perfect storm' reproduced here consists of:
+// - the commit index has fallen far behind due to a slow log on the leader
+//   and one of the three replicas being inaccessible
+// - the other replica elects itself
+// - before the old leader notices it has been ousted, it writes at least one more
+//   operation to its local log.
+// - before the replica can replicate anything to the old leader, it receives
+//   more writes, such that the first batch's preceding_op_id is ahead of
+//   the old leader's last written
+//
+// See the detailed comments below for more details.
+TEST_F(RaftConsensusITest, TestCommitIndexFarBehindAfterLeaderElection) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+
+  if (!AllowSlowTests()) return;
+
+  // Set the batch size low so that, after the new leader takes
+  // over below, the ops required to catch up from the committed index
+  // to the newly replicated index don't fit into a single batch.
+  BuildAndStart({"--consensus_max_batch_size_bytes=50000"});
+
+  // Get the leader and the two replica tablet servers.
+  // These will have the following roles in this test:
+  // 1) 'first_leader_ts' is the initial leader.
+  // 2) 'second_leader_ts' will be forced to be elected as the second leader
+  // 3) 'only_vote_ts' will simulate a heavily overloaded (or corrupted) TS
+  //     which is far enough behind (or failed) such that it only participates
+  //     by voting.
+  TServerDetails* leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+  ExternalTabletServer* first_leader_ts = cluster_->tablet_server_by_uuid(leader->uuid());
+  ExternalTabletServer* second_leader_ts = nullptr;
+  ExternalTabletServer* only_vote_ts = nullptr;
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ExternalTabletServer* ts = cluster_->tablet_server(i);
+    if (ts->instance_id().permanent_uuid() != leader->uuid()) {
+      if (second_leader_ts == nullptr) {
+        second_leader_ts = ts;
+      } else {
+        only_vote_ts = ts;
+      }
+    }
+  }
+
+  // The 'only_vote' tablet server doesn't participate in replication.
+  ASSERT_OK(cluster_->SetFlag(only_vote_ts, "follower_reject_update_consensus_requests", "true"));
+
+  // Inject a long delay in the log of the first leader, and write 10 operations.
+  // This delay ensures that it will replicate them to both itself and its follower,
+  // but due to its log sync not completing, it won't know that it is safe to advance its
+  // commit index until long after it has lost its leadership.
+  ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency_ms_mean", "6000"));
+  ASSERT_OK(cluster_->SetFlag(first_leader_ts, "log_inject_latency", "true"));
+  InsertPayloadIgnoreErrors(0, 10, 10000);
+
+  // Write one more operation to the leader, but disable consensus on the follower so that
+  // it doesn't get replicated.
+  ASSERT_OK(cluster_->SetFlag(
+      second_leader_ts, "follower_reject_update_consensus_requests", "true"));
+  InsertPayloadIgnoreErrors(10, 1, 10000);
+
+  // Pause the initial leader and wait for the replica to elect itself. The third TS participates
+  // here by voting.
+  first_leader_ts->Pause();
+  ASSERT_OK(WaitUntilLeader(tablet_servers_[second_leader_ts->uuid()], tablet_id_, kTimeout));
+
+  // The voter TS has done its duty. Shut it down to avoid log spam where it tries to run
+  // elections.
+  only_vote_ts->Shutdown();
+
+  // Perform one insert on the new leader. The new leader has not yet replicated its NO_OP to
+  // the old leader, since the old leader is still paused.
+  NO_FATALS(CreateClient(&client_));
+  InsertPayloadIgnoreErrors(13, 1, 10000);
+
+  // Now we expect to have the following logs:
+  //
+  // first_leader_ts         second_leader_ts
+  // -------------------     ------------
+  // 1.1  NO_OP      1.1     NO_OP
+  // 1.2  WRITE_OP   1.2     WRITE_OP
+  // ................................
+  // 1.11 WRITE_OP   1.11    WRITE_OP
+  // 1.12 WRITE_OP   2.12    NO_OP
+  //                 2.13    WRITE_OP
+  //
+  // Both servers should have a committed_idx of 1.1 since the log was delayed.
+
+  // Now, when we resume the original leader, we expect them to recover properly.
+  // Previously this triggered KUDU-1469.
+  first_leader_ts->Resume();
+
+  TabletServerMap active_tservers = tablet_servers_;
+  active_tservers.erase(only_vote_ts->uuid());
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30),
+                                  active_tservers,
+                                  tablet_id_, 13));
+}
+
 // Run a regular workload with one follower that's writing to its WAL slowly.
 TEST_F(RaftConsensusITest, TestSlowFollower) {
   if (!AllowSlowTests()) return;