You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by dr...@apache.org on 2016/07/15 01:51:30 UTC

incubator-kudu git commit: Add integration tests for replay cache with writes

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 59ab14b9a -> d0cff255f


Add integration tests for replay cache with writes

This adds a couple of new integration tests for replay cache with
writes. Both tests start multiple threads writing, independently, to
tablet servers simulaneously. The tests leverage the fact that followers
are also able to answer requests, once they are cached, and stores all
responses, which are compared at the end of the test.

Some of the requests (1/3) are "empty" writes, so that we stress the
serialization point in transaction_driver.cc without relying on row
lock serialization.

This adds two different tests, one that stresses a lot of elections
and one that crashes nodes. This is inline with other tests we already
had in raft_consensus-itest.

This also adds a new fault injection point right after the leader sends
a request. We currently have one right _before_ the leader sends
a request, but having one for after the request is sent encourages
stressing the path where a newly elected leader as both incoming
client request and ongoing replica transactions, which can possibly
race with each other if they correspond to the same write.

Finally this changes attempt_no in RequestIdPB to be an int64 instead
of just an int. While an int is more than enough in normal operation,
the new test generates many more attempts and we need a bigger number
to make sure all attempt numbers are unique.

I looped this about 1000 times, without related failures.

Change-Id: I35722eb1c83f97e886cfe9d6b03ed95bcd62429f
Reviewed-on: http://gerrit.cloudera.org:8080/3519
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/d0cff255
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/d0cff255
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/d0cff255

Branch: refs/heads/master
Commit: d0cff255f84e75b70c0c39ccd34a35f348e3c722
Parents: 59ab14b
Author: David Alves <da...@cloudera.com>
Authored: Mon Jul 4 15:04:01 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Jul 15 01:48:49 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_peers.cc           |   7 +
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../exactly_once_writes-itest.cc                | 286 +++++++++++++++++++
 .../integration-tests/raft_consensus-itest.cc   |  37 +--
 src/kudu/integration-tests/ts_itest-base.h      |  26 ++
 src/kudu/rpc/rpc_header.proto                   |   2 +-
 6 files changed, 326 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index 0254237..5363dea 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -53,6 +53,11 @@ DECLARE_int32(raft_heartbeat_interval_ms);
 DEFINE_double(fault_crash_on_leader_request_fraction, 0.0,
               "Fraction of the time when the leader will crash just before sending an "
               "UpdateConsensus RPC. (For testing only!)");
+
+DEFINE_double(fault_crash_after_leader_request_fraction, 0.0,
+              "Fraction of the time when the leader will crash on getting a response for an "
+              "UpdateConsensus RPC. (For testing only!)");
+
 TAG_FLAG(fault_crash_on_leader_request_fraction, unsafe);
 
 
@@ -229,6 +234,8 @@ void Peer::ProcessResponse() {
   DCHECK_EQ(0, sem_.GetValue())
     << "Got a response when nothing was pending";
 
+  MAYBE_FAULT(FLAGS_fault_crash_after_leader_request_fraction);
+
   if (!controller_.status().ok()) {
     if (controller_.status().IsRemoteError()) {
       // Most controller errors are caused by network issues or corner cases

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 9b1d88d..b2fefae 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -64,6 +64,7 @@ ADD_KUDU_TEST(tablet_replacement-itest)
 ADD_KUDU_TEST(create-table-itest)
 ADD_KUDU_TEST(fuzz-itest)
 ADD_KUDU_TEST(write_throttling-itest)
+ADD_KUDU_TEST(exactly_once_writes-itest)
 
 # Some tests have additional dependencies
 set(KUDU_TEST_LINK_LIBS kudu_client kudu_tools_util ${KUDU_TEST_LINK_LIBS})

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/integration-tests/exactly_once_writes-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/exactly_once_writes-itest.cc b/src/kudu/integration-tests/exactly_once_writes-itest.cc
new file mode 100644
index 0000000..657604b
--- /dev/null
+++ b/src/kudu/integration-tests/exactly_once_writes-itest.cc
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/logging.h"
+
+namespace kudu {
+namespace tserver {
+
+static const int kConsensusRpcTimeoutForTests = 50;
+static const int kNumDifferentRows = 1000;
+
+class ExactlyOnceSemanticsITest : public TabletServerIntegrationTestBase {
+ public:
+  ExactlyOnceSemanticsITest() : seed_(SeedRandom()) {}
+
+  void SetUp() override {
+    TabletServerIntegrationTestBase::SetUp();
+    FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
+  }
+
+  // Writes 'num_rows' to the tablet server listening on 'address' and collects all success
+  // responses. If a write fails for some reason, continues to try until it succeeds. Since
+  // followers are also able to return responses to the client, writes should succeed in bounded
+  // time. Uses 'random' to generate the rows to write so that multiple threads try to write the
+  // same rows.
+  void WriteRowsAndCollectResponses(int thread_idx,
+                                    int num_batches,
+                                    int batch_size,
+                                    Barrier* barrier,
+                                    vector<WriteResponsePB>* responses);
+
+  void DoTestWritesWithExactlyOnceSemantics(const vector<string>& ts_flags,
+                                            const vector<string>& master_flags,
+                                            int num_batches,
+                                            bool allow_crashes);
+
+ protected:
+  int seed_;
+
+};
+
+void ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses(int thread_idx,
+                                                             int num_batches,
+                                                             int batch_size,
+                                                             Barrier* barrier,
+                                                             vector<WriteResponsePB>* responses) {
+
+  const int64_t kMaxAttempts = 100000;
+  // Set the same seed in all threads so that they generate the same requests.
+  Random random(seed_);
+  Sockaddr address = cluster_.get()->tablet_server(
+      thread_idx % FLAGS_num_replicas)->bound_rpc_addr();
+
+  RpcController controller;
+
+  const Schema schema = GetSimpleTestSchema();
+
+  std::shared_ptr<rpc::Messenger> client_messenger;
+  rpc::MessengerBuilder bld("Client");
+  ASSERT_OK(bld.Build(&client_messenger));
+
+  std::unique_ptr<TabletServerServiceProxy> proxy(new TabletServerServiceProxy(client_messenger,
+                                                                               address));
+  for (int i = 0; i < num_batches; i++) {
+    barrier->Wait();
+    WriteRequestPB request;
+    request.set_tablet_id(tablet_id_);
+    SchemaToPB(schema, request.mutable_schema());
+
+    // For 1/3 of the batches peform an empty write. This will make sure that we also stress
+    // the path where writes aren't serialized by row locks.
+    if (i % 3 != 0) {
+      for (int j = 0; j < batch_size; j++) {
+        int row_key = random.Next() % kNumDifferentRows;
+        AddTestRowToPB(RowOperationsPB::INSERT, schema, row_key, row_key, "",
+                       request.mutable_row_operations());
+      }
+    }
+
+    int64_t num_attempts = 0;
+    int64_t base_attempt_idx = thread_idx * num_batches + i;
+    while (true) {
+      controller.Reset();
+      WriteResponsePB response;
+
+      std::unique_ptr<rpc::RequestIdPB> request_id(new rpc::RequestIdPB());
+      request_id->set_client_id("test_client");
+      request_id->set_seq_no(i);
+      request_id->set_attempt_no(base_attempt_idx * kMaxAttempts + num_attempts);
+      request_id->set_first_incomplete_seq_no(rpc::RequestTracker::NO_SEQ_NO);
+
+      controller.SetRequestIdPB(std::move(request_id));
+
+      Status status = proxy->Write(request, &response, &controller);
+      if (status.ok() && response.has_error()) {
+        status = StatusFromPB(response.error().status());
+      }
+      // If there was no error, store the response.
+      if (status.ok()) {
+        responses->push_back(response);
+        break;
+      }
+
+      KLOG_EVERY_N(INFO, 100) << "[" << thread_idx << "] Couldn't write batch [" << i << "/"
+          << num_batches << "]. Status: " << status.ToString();
+      num_attempts++;
+      SleepFor(MonoDelta::FromMilliseconds(2));
+      if (num_attempts > kMaxAttempts) {
+        FAIL() << "Couldn't write request to tablet server @ " << address.ToString()
+                   << " Status: " << status.ToString();
+      }
+    }
+  }
+}
+
+void ExactlyOnceSemanticsITest::DoTestWritesWithExactlyOnceSemantics(
+    const vector<string>& ts_flags,
+    const vector<string>& master_flags,
+    int num_batches,
+    bool allow_crashes) {
+  const int kBatchSize = 10;
+  const int kNumThreadsPerReplica = 2;
+
+  BuildAndStart(ts_flags, master_flags);
+
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+
+  vector<scoped_refptr<kudu::Thread>> threads;
+
+  const int num_threads = FLAGS_num_replicas * kNumThreadsPerReplica;
+  vector<vector<WriteResponsePB>> responses(num_threads);
+  Barrier barrier(num_threads);
+
+  // Create kNumThreadsPerReplica write threads per replica.
+  for (int i = 0; i < num_threads; i++) {
+    int thread_idx = i;
+    int ts_idx = thread_idx % FLAGS_num_replicas;
+    scoped_refptr<kudu::Thread> thread;
+    string worker_name = strings::Substitute(
+        "writer-$0-$1", thread_idx,
+        cluster_.get()->tablet_server(ts_idx)->bound_rpc_addr().ToString());
+
+    kudu::Thread::Create(
+        "TestWritesWithExactlyOnceSemantics",
+        worker_name,
+        &ExactlyOnceSemanticsITest::WriteRowsAndCollectResponses,
+        this,
+        thread_idx,
+        num_batches,
+        kBatchSize,
+        &barrier,
+        &responses[i],
+        &thread);
+    threads.push_back(thread);
+  }
+
+  bool done = false;
+  while (!done) {
+    done = true;
+    for (auto& thread : threads) {
+      if (ThreadJoiner(thread.get()).give_up_after_ms(0).Join().IsAborted()) {
+        done = false;
+        break;
+      }
+    }
+    if (allow_crashes) {
+      RestartAnyCrashedTabletServers();
+    } else {
+      AssertNoTabletServersCrashed();
+    }
+
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Make sure we're received the same responses, for the same operations, on all threads.
+  bool mismatched = false;
+  for (int i = 0; i < num_batches; i++) {
+    for (int j = 0; j < num_threads; j++) {
+      string expected_response = responses[j][i].ShortDebugString();
+      string expected_ts = strings::Substitute(
+          "T:$0 TSidx:$1 TSuuid:$2", j, j % FLAGS_num_replicas,
+          cluster_.get()->tablet_server(j % FLAGS_num_replicas)->instance_id().permanent_uuid());
+      for (int k = 0; k < num_threads; k++) {
+        string got_response = responses[k][i].ShortDebugString();
+        string got_ts = strings::Substitute(
+            "T:$0 TSidx:$1 TSuuid:$2", k, k % FLAGS_num_replicas,
+            cluster_.get()->tablet_server(k % FLAGS_num_replicas)->instance_id().permanent_uuid());
+        if (expected_response != got_response) {
+          mismatched = true;
+          LOG(ERROR) << "Responses mismatched. Expected[" << expected_ts << "]: "
+              << expected_response << " Got[" << got_ts << "]: " << got_response;
+        }
+      }
+    }
+  }
+  if (mismatched) {
+    FAIL() << "Got mismatched responses";
+  }
+}
+
+// This tests exactly once semantics by starting a cluster with multiple replicas and attempting
+// to write in all the replicas at the same time.
+// The write workload purposefully uses repeated rows so that we can make sure that the same
+// response is obtained from all the replicas (responses without errors are trivially equal).
+// Finally this crashes nodes and uses a very small election timeout to trigger rare paths that
+// only happen on leader change.
+TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithCrashyNodes) {
+  vector<string> ts_flags, master_flags;
+
+  // Crash 2.5% of the time right after sending an RPC. This makes sure we stress the path
+  // where there are duplicate handlers for a transaction as a leader crashes right
+  // after sending requests to followers.
+  ts_flags.push_back("--fault_crash_after_leader_request_fraction=0.025");
+
+  // Make leader elections faster so we get through more cycles of leaders.
+  ts_flags.push_back("--raft_heartbeat_interval_ms=200");
+  ts_flags.push_back("--leader_failure_monitor_check_mean_ms=100");
+  ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=50");
+
+  // Avoid preallocating segments since bootstrap is a little bit
+  // faster if it doesn't have to scan forward through the preallocated
+  // log area.
+  ts_flags.push_back("--log_preallocate_segments=false");
+
+  int num_batches = 50;
+  if (AllowSlowTests()) {
+    num_batches = 500;
+    FLAGS_num_tablet_servers = 7;
+    FLAGS_num_replicas = 7;
+  }
+
+  DoTestWritesWithExactlyOnceSemantics(ts_flags,
+                                       master_flags,
+                                       num_batches,
+                                       true /* Allow crashes */);
+}
+
+// Like the test above but instead of crashing nodes makes sure elections are churny.
+TEST_F(ExactlyOnceSemanticsITest, TestWritesWithExactlyOnceSemanticsWithChurnyElections) {
+  vector<string> ts_flags, master_flags;
+
+#if defined(THREAD_SANITIZER) || defined(ADDRESS_SANITIZER)
+  // On TSAN/ASAN builds, we need to be a little bit less churny in order to make
+  // any progress at all.
+  ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+#else
+  ts_flags.push_back("--raft_heartbeat_interval_ms=2");
+#endif
+  ts_flags.push_back("--leader_failure_monitor_check_mean_ms=2");
+  ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=1");
+  ts_flags.push_back("--never_fsync");
+
+  int num_batches = 1000;
+  if (AllowSlowTests()) {
+    num_batches = 5000;
+    // Only set this to 5 replicas, for slow tests, otherwise we overwhelm the jenkins slaves,
+    // elections run forever and the test doesn't complete.
+    FLAGS_num_tablet_servers = 5;
+    FLAGS_num_replicas = 5;
+  }
+
+  DoTestWritesWithExactlyOnceSemantics(ts_flags,
+                                       master_flags,
+                                       num_batches,
+                                       false /* No crashes */);
+}
+
+} // namespace tserver
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index fbb7577..2124440 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -241,8 +241,11 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
         bool overflow;
         session->GetPendingErrors(&errors, &overflow);
         CHECK(!overflow);
-        for (const client::KuduError* e : errors) {
-          CHECK(e->status().IsAlreadyPresent()) << "Unexpected error: " << e->status().ToString();
+        if (!errors.empty()) {
+          for (const client::KuduError* e : errors) {
+            LOG(ERROR) << "Unexpected error: " << e->status().ToString();
+          }
+          FAIL() << "Found errors while inserting.";
         }
         inserted -= errors.size();
       }
@@ -342,14 +345,6 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   // has a payload of around 128KB. Causes a gtest failure on error.
   void Write128KOpsToLeader(int num_writes);
 
-  // Check for and restart any TS that have crashed.
-  // Returns the number of servers restarted.
-  int RestartAnyCrashedTabletServers();
-
-  // Assert that no tablet servers have crashed.
-  // Tablet servers that have been manually Shutdown() are allowed.
-  void AssertNoTabletServersCrashed();
-
   // Ensure that a majority of servers is required for elections and writes.
   // This is done by pausing a majority and asserting that writes and elections fail,
   // then unpausing the majority and asserting that elections and writes succeed.
@@ -789,28 +784,6 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
   }
 }
 
-int RaftConsensusITest::RestartAnyCrashedTabletServers() {
-  int restarted = 0;
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    if (!cluster_->tablet_server(i)->IsProcessAlive()) {
-      LOG(INFO) << "TS " << i << " appears to have crashed. Restarting.";
-      cluster_->tablet_server(i)->Shutdown();
-      CHECK_OK(cluster_->tablet_server(i)->Restart());
-      restarted++;
-    }
-  }
-  return restarted;
-}
-
-void RaftConsensusITest::AssertNoTabletServersCrashed() {
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    if (cluster_->tablet_server(i)->IsShutdown()) continue;
-
-    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
-      << "Tablet server " << i << " crashed";
-  }
-}
-
 // This test starts several tablet servers, and configures them with
 // fault injection so that the leaders frequently crash just before
 // sending RPCs to followers.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index 225c942..2810930 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -473,6 +473,32 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
     NO_FATALS(v.CheckRowCount(kTableId, ClusterVerifier::EXACTLY, expected_result_count));
   }
 
+  // Check for and restart any TS that have crashed.
+  // Returns the number of servers restarted.
+  int RestartAnyCrashedTabletServers() {
+    int restarted = 0;
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      if (!cluster_->tablet_server(i)->IsProcessAlive()) {
+        LOG(INFO) << "TS " << i << " appears to have crashed. Restarting.";
+        cluster_->tablet_server(i)->Shutdown();
+        CHECK_OK(cluster_->tablet_server(i)->Restart());
+        restarted++;
+      }
+    }
+    return restarted;
+  }
+
+  // Assert that no tablet servers have crashed.
+  // Tablet servers that have been manually Shutdown() are allowed.
+  void AssertNoTabletServersCrashed() {
+    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+      if (cluster_->tablet_server(i)->IsShutdown()) continue;
+
+      ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
+                    << "Tablet server " << i << " crashed";
+    }
+  }
+
  protected:
   gscoped_ptr<ExternalMiniCluster> cluster_;
   gscoped_ptr<itest::ExternalMiniClusterFsInspector> inspect_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/d0cff255/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 73ed070..f394fbe 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -127,7 +127,7 @@ message RequestIdPB {
 
   // The number of times this RPC has been tried.
   // Set to 1 in the first attempt.
-  required int32 attempt_no = 4;
+  required int64 attempt_no = 4;
 }
 
 // The header for the RPC request frame.