You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/09/21 20:03:57 UTC

kudu git commit: KUDU-2463 pt 1: adjust MVCC when replaying no-ops

Repository: kudu
Updated Branches:
  refs/heads/master 34e88d3da -> 6129467d9


KUDU-2463 pt 1: adjust MVCC when replaying no-ops

Previously, during tablet bootstrap, a tablet replica would only update
its MVCC safetime based on write and alter schema messages, as the
timestamps in these messages are guaranteed to be serialized with
respect to one another, by virtue of being assigned in a single thread
(the prepare thread) on the leader replica.

>From this, we conclude that timestamps for write and alter schema
operations are monotonically increasing in unison with opid. Because
no-op and change config operations are not serialized through the
prepare thread, they don't necessarily provide the same timestamp
monotonicity guarantees.

However, in the case of no-ops replicated to assert leadership, we get
the same timestamp monotonicity guarantee due to a different mechanism.
This patch takes advantage of the fact that our Raft implementation
ensures the following sequence of events:

1. replica A becomes leader of Term N
2. leader A assigns a timestamp t1 to its no-op
3. leader A replicates the no-op to replicas B and C, asserting its
   leadership for Term N
4. leader A prepares a write and assigns it a timestamp t2 > t1
5. leader A replicates the write to replicas B and C, checking that it
   is leader for the Term N

Given the above series of operations, for a given term, the no-op used
to assert leadership is always assigned a timestamp that must be lower
than any other ops in that term. As such, the timestamps assigned to
no-ops can and should be used to bump MVCC safe time. This patch does so
at bootstrap time with committed no-ops found in the WAL.

This alone isn't enough to always prevent KUDU-2463, but it dramatically
reduces the likelihood of it occuring.

Change-Id: I26deff32da8c990cb8a2ba220bb81858ddd6d73f
Reviewed-on: http://gerrit.cloudera.org:8080/11142
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6129467d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6129467d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6129467d

Branch: refs/heads/master
Commit: 6129467d964a073f3e68474200452a8c3f639996
Parents: 34e88d3
Author: Andrew Wong <aw...@cloudera.com>
Authored: Tue Sep 11 12:09:58 2018 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Sep 21 19:55:23 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus.proto              |   6 +
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../integration-tests/raft_consensus-itest.cc   |   6 +-
 .../timestamp_advancement-itest.cc              | 236 +++++++++++++++++++
 src/kudu/tablet/tablet_bootstrap-test.cc        |   1 +
 src/kudu/tablet/tablet_bootstrap.cc             |  35 ++-
 6 files changed, 276 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 5eeecdd..68877ca 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -229,6 +229,12 @@ message CommitMsg {
 message NoOpRequestPB {
  // Allows to set a dummy payload, for tests.
  optional bytes payload_for_tests = 1;
+
+ // Set to true if the op id for this request is expected to be monotonically
+ // increasing with the assigned timestamp. For no-ops that are sent by a
+ // leader marking a successful Raft election, this is true. If not set, it is
+ // assumed to be true.
+ optional bool timestamp_in_opid_order = 2;
 }
 
 // Status message received in the peer responses.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index ad814e3..512a454 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -109,6 +109,7 @@ ADD_KUDU_TEST(tablet_copy-itest NUM_SHARDS 6 PROCESSORS 4)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(timestamp_advancement-itest)
 ADD_KUDU_TEST(tombstoned_voting-imc-itest)
 ADD_KUDU_TEST(tombstoned_voting-itest)
 ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)

http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index f7be807..20c3c7b 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -256,7 +256,11 @@ void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
                                              ConsensusRequestPB* req) {
   ReplicateMsg* msg = req->add_ops();
   msg->mutable_id()->CopyFrom(id);
-  msg->set_timestamp(id.index());
+  // Set a somewhat realistic timestamp such that it is monotonically
+  // increasing per op and starts off higher than 1. This is required, as some
+  // test cases test the scenario where the WAL is replayed and no-ops and
+  // writes are expected to have monotonically increasing timestamps.
+  msg->set_timestamp(id.index() * 10000 + id.term());
   msg->set_op_type(consensus::WRITE_OP);
   WriteRequestPB* write_req = msg->mutable_write_request();
   CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/integration-tests/timestamp_advancement-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/timestamp_advancement-itest.cc b/src/kudu/integration-tests/timestamp_advancement-itest.cc
new file mode 100644
index 0000000..88bfff5
--- /dev/null
+++ b/src/kudu/integration-tests/timestamp_advancement-itest.cc
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdint.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/timestamp.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log.pb.h"
+#include "kudu/consensus/log_index.h"
+#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/tablet/mvcc.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(log_preallocate_segments);
+DECLARE_bool(log_async_preallocate_segments);
+DECLARE_bool(raft_enable_pre_election);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+namespace kudu {
+
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
+using log::LogReader;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using tablet::TabletReplica;
+using tserver::MiniTabletServer;
+
+namespace itest {
+
+class TimestampAdvancementITest : public MiniClusterITestBase {
+ public:
+  // Sets up a cluster and returns the tablet replica on 'ts' that has written
+  // to its WAL. 'replica' will write further messages to a new WAL segment.
+  void SetupClusterWithWritesInWAL(int ts, scoped_refptr<TabletReplica>* replica) {
+    NO_FATALS(StartCluster(3));
+
+    // Write some rows to the cluster.
+    TestWorkload write(cluster_.get());;
+
+    // Set a low batch size so we have finer-grained control over flushing of
+    // the WAL. Too large, and the WAL may end up flushing in the background.
+    write.set_write_batch_size(1);
+    write.Setup();
+    write.Start();
+    while (write.rows_inserted() < 10) {
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }
+    write.StopAndJoin();
+
+    // Ensure that the replicas eventually get to a point where all of them
+    // have all the rows. This will allow us to GC the WAL, as they will not
+    // need to retain them if fully-replicated.
+    scoped_refptr<TabletReplica> tablet_replica = tablet_replica_on_ts(ts);
+    ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30),
+          ts_map_, tablet_replica->tablet_id(), write.batches_completed()));
+
+    // Flush the current log batch and roll over to get a fresh WAL segment.
+    ASSERT_OK(tablet_replica->log()->WaitUntilAllFlushed());
+    ASSERT_OK(tablet_replica->log()->AllocateSegmentAndRollOver());
+
+    // Also flush the MRS so we're free to GC the WAL segment we just wrote.
+    ASSERT_OK(tablet_replica->tablet()->Flush());
+    *replica = std::move(tablet_replica);
+  }
+
+  // Returns the tablet server 'ts'.
+  MiniTabletServer* tserver(int ts) const {
+    DCHECK(cluster_);
+    return cluster_->mini_tablet_server(ts);
+  }
+
+  // Get the tablet replica on the tablet server 'ts'.
+  scoped_refptr<TabletReplica> tablet_replica_on_ts(int ts) const {
+    vector<scoped_refptr<TabletReplica>> replicas;
+    tserver(ts)->server()->tablet_manager()->GetTabletReplicas(&replicas);
+    DCHECK_EQ(1, replicas.size());
+    return replicas[0];
+  }
+
+  // Returns true if there are any write replicate messages in the WALs of
+  // 'tablet_id' on 'ts'.
+  Status CheckForWriteReplicatesInLog(MiniTabletServer* ts, const string& tablet_id,
+                                      bool* has_write_replicates) const {
+    shared_ptr<LogReader> reader;
+    RETURN_NOT_OK(LogReader::Open(env_,
+                  ts->server()->fs_manager()->GetTabletWalDir(tablet_id),
+                  scoped_refptr<log::LogIndex>(), tablet_id,
+                  scoped_refptr<MetricEntity>(), &reader));
+    log::SegmentSequence segs;
+    RETURN_NOT_OK(reader->GetSegmentsSnapshot(&segs));
+    unique_ptr<log::LogEntryPB> entry;
+    for (const auto& seg : segs) {
+      log::LogEntryReader reader(seg.get());
+      while (true) {
+        Status s = reader.ReadNextEntry(&entry);
+        if (s.IsEndOfFile()) break;
+        RETURN_NOT_OK(s);
+        if (entry->type() == log::REPLICATE &&
+            entry->replicate().op_type() == consensus::WRITE_OP) {
+          *has_write_replicates = true;
+          return Status::OK();
+        }
+      }
+    }
+    *has_write_replicates = false;
+    return Status::OK();
+  }
+};
+
+// Test that bootstrapping a Raft no-op from the WAL will advance the replica's
+// MVCC safe time timestamps.
+TEST_F(TimestampAdvancementITest, TestNoOpAdvancesMvccSafeTimeOnBootstrap) {
+  // Set a low Raft heartbeat and encourage frequent elections so that we can
+  // fill up the WAL with no-op entries naturally.
+  FLAGS_raft_heartbeat_interval_ms = 100;
+  FLAGS_raft_enable_pre_election = false;
+
+  // Prevent preallocation of WAL segments in order to prevent races between
+  // the WAL allocation thread and our manual rolling over of the WAL.
+  FLAGS_log_preallocate_segments = false;
+  FLAGS_log_async_preallocate_segments = false;
+
+  // We're going to manually trigger maintenance ops, so disable maintenance op
+  // scheduling.
+  FLAGS_enable_maintenance_manager = false;
+
+  // Setup a cluster with some writes and a new WAL segment.
+  const int kTserver = 0;
+  scoped_refptr<TabletReplica> replica;
+  NO_FATALS(SetupClusterWithWritesInWAL(kTserver, &replica));
+  MiniTabletServer* ts = tserver(kTserver);
+  const string tablet_id = replica->tablet_id();
+
+  // Now that we're on a new WAL segment, inject latency to consensus so we
+  // trigger elections, and wait for some terms to pass.
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1.0;
+  FLAGS_consensus_inject_latency_ms_in_notifications = 100;
+  const int64_t kNumExtraTerms = 10;
+  int64_t initial_raft_term = replica->consensus()->CurrentTerm();
+  int64_t raft_term = initial_raft_term;
+  while (raft_term < initial_raft_term + kNumExtraTerms) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    raft_term = replica->consensus()->CurrentTerm();
+  }
+
+  // Reduce election churn so we can achieve a stable quorum and get to a point
+  // where we can GC our logs. Note: we won't GC if there are replicas that
+  // need to be caught up.
+  FLAGS_consensus_inject_latency_ms_in_notifications = 0;
+  ASSERT_EVENTUALLY([&] {
+    LOG(INFO) << "GCing logs...";
+    int64_t gcable_size;
+    ASSERT_OK(replica->GetGCableDataSize(&gcable_size));
+    ASSERT_GT(gcable_size, 0);
+    ASSERT_OK(replica->RunLogGC());
+
+    // Ensure that we have no writes in our WALs.
+    bool has_write_replicates;
+    ASSERT_OK(CheckForWriteReplicatesInLog(ts, tablet_id, &has_write_replicates));
+    ASSERT_FALSE(has_write_replicates);
+  });
+
+  // Note: We shut down tservers individually rather than using
+  // ClusterNodes::TS, since the latter would invalidate our reference to 'ts'.
+  replica.reset();
+  LOG(INFO) << "Shutting down the cluster...";
+  cluster_->ShutdownNodes(cluster::ClusterNodes::MASTERS_ONLY);
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster_->mini_tablet_server(i)->Shutdown();
+  }
+
+  // Now prevent elections to reduce consensus logging on the server.
+  LOG(INFO) << "Restarting single tablet server...";
+  ASSERT_OK(ts->Restart());
+  TServerDetails* ts_details = FindOrDie(ts_map_, ts->uuid());
+
+  // Despite there being no writes, there are no-ops, with which we can advance
+  // MVCC's timestamps.
+  ASSERT_OK(WaitUntilTabletRunning(ts_details, tablet_id, MonoDelta::FromSeconds(30)));
+  replica = tablet_replica_on_ts(kTserver);
+  Timestamp cleantime = replica->tablet()->mvcc_manager()->GetCleanTimestamp();
+  ASSERT_NE(cleantime, Timestamp::kInitialTimestamp);
+}
+
+}  // namespace itest
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index 05a55a0..dc2b83f 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -629,6 +629,7 @@ TEST_F(BootstrapTest, TestConsensusOnlyOperationOutOfOrderTimestamp) {
   noop_replicate->get()->set_op_type(consensus::NO_OP);
   *noop_replicate->get()->mutable_id() = MakeOpId(1, 1);
   noop_replicate->get()->set_timestamp(2);
+  noop_replicate->get()->mutable_noop_request()->set_timestamp_in_opid_order(false);
 
   ASSERT_OK(AppendReplicateBatch(noop_replicate));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6129467d/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index e70372a..d32316c 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1093,21 +1093,40 @@ Status TabletBootstrap::HandleEntryPair(const IOContext* io_context, LogEntryPB*
 
 #undef RETURN_NOT_OK_REPLAY
 
-  // Non-tablet operations should not advance the safe time, because they are
-  // not started serially and so may have timestamps that are out of order.
-  if (op_type == NO_OP || op_type == CHANGE_CONFIG_OP) {
+  // We should only advance MVCC's safe time based on a specific set of
+  // operations: those whose timestamps are guaranteed to be monotonically
+  // increasing with respect to their entries in the write-ahead log.
+  bool timestamp_assigned_in_opid_order = true;
+  switch (op_type) {
+    case CHANGE_CONFIG_OP:
+      timestamp_assigned_in_opid_order = false;
+      break;
+    case NO_OP: {
+      const auto& req = replicate->noop_request();
+      if (req.has_timestamp_in_opid_order()) {
+        timestamp_assigned_in_opid_order = req.timestamp_in_opid_order();
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  if (!timestamp_assigned_in_opid_order) {
     return Status::OK();
   }
 
   // Handle safe time advancement:
   //
-  // If this operation has an external consistency mode other than COMMIT_WAIT, we know that no
-  // future transaction will have a timestamp that is lower than it, so we can just advance the
-  // safe timestamp to this operation's timestamp.
+  // If this message is a Raft election no-op, or is a transaction op that
+  // has an external consistency mode other than COMMIT_WAIT, we know that no
+  // future transaction will have a timestamp that is lower than it, so we can
+  // just advance the safe timestamp to the message's timestamp.
   //
-  // If the hybrid clock is disabled, all transactions will fall into this category.
+  // If the hybrid clock is disabled, all transactions will fall into this
+  // category.
   Timestamp safe_time;
-  if (replicate->write_request().external_consistency_mode() != COMMIT_WAIT) {
+  if (replicate->op_type() != consensus::WRITE_OP ||
+      replicate->write_request().external_consistency_mode() != COMMIT_WAIT) {
     safe_time = Timestamp(replicate->timestamp());
   // ... else we set the safe timestamp to be the transaction's timestamp minus the maximum clock
   // error. This opens the door for problems if the flags changed across reboots, but this is