You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/05 01:58:21 UTC

[1/7] kudu git commit: [flaky tests] Fix "Already present" failures on raft_consensus-itest

Repository: kudu
Updated Branches:
  refs/heads/master 60aa54e21 -> 80ac8bae3


[flaky tests] Fix "Already present" failures on raft_consensus-itest

In the flaky tests dashboard TestSlowLeader fails with:

"Check failed: e->status().ok() Unexpected status: Already present: key already present"

This happens because it's possible for TestWorkload to generate
identical random numbers on different threads, even though we use
a multiplicative linear congruential PRNG that is supposed to
generate all unique numbers within a single period of the PRNG.

This patch changes TestWorkload to use a ThreadSafeRandom. We
could also change the key type to int64 and do something like
int64 key = r.Next32() << 32 | thread_index, however changing
the type of the key is very invasive as a bunch of tests
depend on it.

This also increases the timeout of the snapshot scan in
TestReplicaBehaviorViaRPC as this would spuriously fail and
increases the time we wait on TestCommitIndexFarBehindAfterLeaderElection
which would cause spurious failures.

Results of running this on dist-test:
http://dist-test.cloudera.org//job?job_id=david.alves.1480656349.3518

To be fair I've only seen the failure "Already present" failure
outside of the flaky dashboard once, it's probably rarer than
1000 loops would allow to assert. However it's apparently difficult
to mimic the exact same conditions as the flaky dashboard tests:
running raft_consensus-itest with the stress option makes it incredibly
flaky, with different failures than the ones seen on the dashboard.
Not running it with stress makes it pass the large majority of
the time.

Change-Id: I35faf53cb9bb8585ec1c01d038b1cd64a0bb533e
Reviewed-on: http://gerrit.cloudera.org:8080/5319
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8b991410
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8b991410
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8b991410

Branch: refs/heads/master
Commit: 8b99141006770131ede015a546aa3bfe6ca752f9
Parents: 60aa54e
Author: David Alves <dr...@apache.org>
Authored: Thu Dec 1 14:39:06 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Dec 2 23:40:33 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/raft_consensus-itest.cc     |  4 ++--
 src/kudu/integration-tests/test_workload.cc       | 13 +++++++------
 src/kudu/integration-tests/test_workload.h        | 18 ++++++++++++++----
 3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8b991410/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 8d016b0..1bd618d 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -1402,7 +1402,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ScanRequestPB req;
     ScanResponsePB resp;
     RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromMilliseconds(100));
+    rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
     NewScanRequestPB* scan = req.mutable_new_scan_request();
     scan->set_tablet_id(tablet_id_);
     scan->set_read_mode(READ_AT_SNAPSHOT);
@@ -2506,7 +2506,7 @@ TEST_F(RaftConsensusITest, TestCommitIndexFarBehindAfterLeaderElection) {
 
   TabletServerMap active_tservers = tablet_servers_;
   active_tservers.erase(only_vote_ts->uuid());
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30),
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(60),
                                   active_tservers,
                                   tablet_id_, 13));
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/8b991410/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 96ffb51..6989223 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -27,6 +27,7 @@
 #include "kudu/util/env.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
 namespace kudu {
@@ -44,13 +45,13 @@ const char* const TestWorkload::kDefaultTableName = "test-workload";
 
 TestWorkload::TestWorkload(MiniClusterBase* cluster)
   : cluster_(cluster),
+    rng_(SeedRandom()),
     payload_bytes_(11),
     num_write_threads_(4),
     write_batch_size_(50),
     write_timeout_millis_(20000),
     timeout_allowed_(false),
     not_found_allowed_(false),
-    already_present_allowed_(false),
     network_error_allowed_(false),
     num_replicas_(3),
     num_tablets_(1),
@@ -60,6 +61,8 @@ TestWorkload::TestWorkload(MiniClusterBase* cluster)
     rows_inserted_(0),
     batches_completed_(0),
     sequential_key_gen_(0) {
+  // Make the default write pattern random row inserts.
+  set_write_pattern(INSERT_RANDOM_ROWS);
 }
 
 TestWorkload::~TestWorkload() {
@@ -67,8 +70,6 @@ TestWorkload::~TestWorkload() {
 }
 
 void TestWorkload::WriteThread() {
-  Random r(Env::Default()->gettid());
-
   shared_ptr<KuduTable> table;
   // Loop trying to open up the table. In some tests we set up very
   // low RPC timeouts to test those behaviors, so this might fail and
@@ -104,7 +105,7 @@ void TestWorkload::WriteThread() {
         gscoped_ptr<KuduUpdate> update(table->NewUpdate());
         KuduPartialRow* row = update->mutable_row();
         CHECK_OK(row->SetInt32(0, 0));
-        CHECK_OK(row->SetInt32(1, r.Next()));
+        CHECK_OK(row->SetInt32(1, rng_.Next()));
         CHECK_OK(session->Apply(update.release()));
       } else {
         gscoped_ptr<KuduInsert> insert(table->NewInsert());
@@ -113,13 +114,13 @@ void TestWorkload::WriteThread() {
         if (write_pattern_ == INSERT_SEQUENTIAL_ROWS) {
           key = sequential_key_gen_.Increment();
         } else {
-          key = r.Next();
+          key = rng_.Next();
           if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
             key %= kNumRowsForDuplicateKeyWorkload;
           }
         }
         CHECK_OK(row->SetInt32(0, key));
-        CHECK_OK(row->SetInt32(1, r.Next()));
+        CHECK_OK(row->SetInt32(1, rng_.Next()));
         string test_payload("hello world");
         if (payload_bytes_ != 11) {
           // We fill with zeros if you change the default.

http://git-wip-us.apache.org/repos/asf/kudu/blob/8b991410/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 8d88e1a..486c92a 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -26,6 +26,7 @@
 #include "kudu/util/atomic.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/random.h"
 
 namespace kudu {
 
@@ -130,9 +131,17 @@ class TestWorkload {
 
   void set_write_pattern(WritePattern pattern) {
     write_pattern_ = pattern;
-    // Since we're writing with dup keys we will get AlreadyPresent() errors on the response
-    // so allow it.
-    set_already_present_allowed(true);
+    switch (pattern) {
+      case INSERT_WITH_MANY_DUP_KEYS:
+        set_already_present_allowed(true);
+        break;
+      case INSERT_RANDOM_ROWS:
+      case UPDATE_ONE_ROW:
+      case INSERT_SEQUENTIAL_ROWS:
+        set_already_present_allowed(false);
+        break;
+      default: LOG(FATAL) << "Unsupported WritePattern.";
+    }
   }
 
   // Sets up the internal client and creates the table which will be used for
@@ -166,6 +175,7 @@ class TestWorkload {
   MiniClusterBase* cluster_;
   client::KuduClientBuilder client_builder_;
   client::sp::shared_ptr<client::KuduClient> client_;
+  ThreadSafeRandom rng_;
 
   int payload_bytes_;
   int num_write_threads_;
@@ -175,7 +185,7 @@ class TestWorkload {
   bool not_found_allowed_;
   bool already_present_allowed_;
   bool network_error_allowed_;
-  WritePattern write_pattern_ = INSERT_RANDOM_ROWS;
+  WritePattern write_pattern_;
 
   int num_replicas_;
   int num_tablets_;


[2/7] kudu git commit: Allow to get a MonoDelta from two Timestamps

Posted by to...@apache.org.
Allow to get a MonoDelta from two Timestamps

This adds a new method to Clock:
MonoDelta GetPhysicalComponentDifference(Timestamp a, Timestamp b) const

The MonoDelta represents the difference between the physical
components of 'a' and 'b'.

Change-Id: I1e5bcd7cb2e9c2b5ce1458c5366d28c385864b0a
Reviewed-on: http://gerrit.cloudera.org:8080/5304
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fffe5765
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fffe5765
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fffe5765

Branch: refs/heads/master
Commit: fffe5765543d36ea9b62b52b7b657dd2c7f787fd
Parents: 8b99141
Author: David Alves <dr...@apache.org>
Authored: Thu Dec 1 04:01:57 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sat Dec 3 01:25:20 2016 +0000

----------------------------------------------------------------------
 src/kudu/server/clock.h              | 12 ++++++++++--
 src/kudu/server/hybrid_clock-test.cc | 10 ++++++++++
 src/kudu/server/hybrid_clock.cc      | 12 ++++++++----
 src/kudu/server/hybrid_clock.h       |  4 +++-
 4 files changed, 31 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fffe5765/src/kudu/server/clock.h
----------------------------------------------------------------------
diff --git a/src/kudu/server/clock.h b/src/kudu/server/clock.h
index 777667b..875b516 100644
--- a/src/kudu/server/clock.h
+++ b/src/kudu/server/clock.h
@@ -29,7 +29,6 @@
 namespace kudu {
 class faststring;
 class MetricEntity;
-class MonoDelta;
 class Slice;
 class Status;
 namespace server {
@@ -67,10 +66,19 @@ class Clock : public RefCountedThreadSafe<Clock> {
 
   // Indicates whether the clock has a physical component to its timestamps
   // (wallclock time).
-  virtual bool HasPhysicalComponent() {
+  virtual bool HasPhysicalComponent() const {
     return false;
   }
 
+  // Get a MonoDelta representing the physical component difference between two timestamps,
+  // specifically lhs - rhs.
+  //
+  // Requires that this clock's timestamps have a physical component, i.e.
+  // that HasPhysicalComponent() return true, otherwise it will crash.
+  virtual MonoDelta GetPhysicalComponentDifference(Timestamp /*lhs*/, Timestamp /*rhs*/) const {
+    LOG(FATAL) << "Clock's timestamps don't have a physical component.";
+  }
+
   // Update the clock with a transaction timestamp originating from
   // another server. For instance replicas can call this so that,
   // if elected leader, they are guaranteed to generate timestamps

http://git-wip-us.apache.org/repos/asf/kudu/blob/fffe5765/src/kudu/server/hybrid_clock-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock-test.cc b/src/kudu/server/hybrid_clock-test.cc
index 6e67277..b53fe0b 100644
--- a/src/kudu/server/hybrid_clock-test.cc
+++ b/src/kudu/server/hybrid_clock-test.cc
@@ -279,5 +279,15 @@ TEST_F(HybridClockTest, TestClockDoesntGoBackwardsWithUpdates) {
   }
 }
 
+TEST_F(HybridClockTest, TestGetPhysicalComponentDifference) {
+  Timestamp now1 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(100, 100);
+  SleepFor(MonoDelta::FromMilliseconds(1));
+  Timestamp now2 = HybridClock::TimestampFromMicrosecondsAndLogicalValue(200, 0);
+  MonoDelta delta = clock_->GetPhysicalComponentDifference(now2, now1);
+  MonoDelta negative_delta = clock_->GetPhysicalComponentDifference(now1, now2);
+  ASSERT_EQ(100, delta.ToMicroseconds());
+  ASSERT_EQ(-100, negative_delta.ToMicroseconds());
+}
+
 }  // namespace server
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fffe5765/src/kudu/server/hybrid_clock.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock.cc b/src/kudu/server/hybrid_clock.cc
index 8ac316a..976bdc0 100644
--- a/src/kudu/server/hybrid_clock.cc
+++ b/src/kudu/server/hybrid_clock.cc
@@ -301,11 +301,15 @@ bool HybridClock::SupportsExternalConsistencyMode(ExternalConsistencyMode mode)
   return true;
 }
 
-bool HybridClock::HasPhysicalComponent() {
+bool HybridClock::HasPhysicalComponent() const {
   return true;
 }
 
-Status HybridClock::WaitUntilAfter(const Timestamp& then_latest,
+MonoDelta HybridClock::GetPhysicalComponentDifference(Timestamp lhs, Timestamp rhs) const {
+  return MonoDelta::FromMicroseconds(GetPhysicalValueMicros(lhs) - GetPhysicalValueMicros(rhs));
+}
+
+Status HybridClock::WaitUntilAfter(const Timestamp& then,
                                    const MonoTime& deadline) {
   TRACE_EVENT0("clock", "HybridClock::WaitUntilAfter");
   Timestamp now;
@@ -317,7 +321,7 @@ Status HybridClock::WaitUntilAfter(const Timestamp& then_latest,
 
   // "unshift" the timestamps so that we can measure actual time
   uint64_t now_usec = GetPhysicalValueMicros(now);
-  uint64_t then_latest_usec = GetPhysicalValueMicros(then_latest);
+  uint64_t then_latest_usec = GetPhysicalValueMicros(then);
 
   uint64_t now_earliest_usec = now_usec - error;
 
@@ -326,7 +330,7 @@ Status HybridClock::WaitUntilAfter(const Timestamp& then_latest,
     return Status::OK();
   }
 
-  // Case 2 wait out until we are sure that then_latest has passed
+  // Case 2 wait out until we are sure that then has passed
 
   // We'll sleep then_latest_usec - now_earliest_usec so that the new
   // nw.earliest is higher than then.latest.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fffe5765/src/kudu/server/hybrid_clock.h
----------------------------------------------------------------------
diff --git a/src/kudu/server/hybrid_clock.h b/src/kudu/server/hybrid_clock.h
index d712c82..c2e1e62 100644
--- a/src/kudu/server/hybrid_clock.h
+++ b/src/kudu/server/hybrid_clock.h
@@ -60,7 +60,9 @@ class HybridClock : public Clock {
   // HybridClock supports all external consistency modes.
   virtual bool SupportsExternalConsistencyMode(ExternalConsistencyMode mode) OVERRIDE;
 
-  virtual bool HasPhysicalComponent() OVERRIDE;
+  virtual bool HasPhysicalComponent() const OVERRIDE;
+
+  MonoDelta GetPhysicalComponentDifference(Timestamp lhs, Timestamp rhs) const OVERRIDE;
 
   // Blocks the caller thread until the true time is after 'then'.
   // In other words, waits until the HybridClock::Now() on _all_ nodes


[4/7] kudu git commit: raft_consensus-itest: add a test for a potential deadlock

Posted by to...@apache.org.
raft_consensus-itest: add a test for a potential deadlock

This adds a test case where multiple pending operations have lock
dependencies on each other, and the replica needs to replace one that's
stuck waiting in the queue before it can make progress.

This catches a bug found in a revision of
https://gerrit.cloudera.org/#/c/5294/ which wasn't covered by existing
test cases.

Change-Id: Ie1143045780886958e45f667b1877b4e34e8b03e
Reviewed-on: http://gerrit.cloudera.org:8080/5341
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/44579480
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/44579480
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/44579480

Branch: refs/heads/master
Commit: 44579480490279ae0b7edbf6511f2b99b563e63b
Parents: 7e3071e
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Dec 2 17:01:07 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sat Dec 3 01:57:39 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/raft_consensus-itest.cc   | 74 +++++++++++++++++++-
 1 file changed, 72 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/44579480/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 1bd618d..8d9c8d3 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -173,6 +173,11 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   // Add an Insert operation to the given consensus request.
   // The row to be inserted is generated based on the OpId.
   void AddOp(const OpId& id, ConsensusRequestPB* req);
+  void AddOpWithTypeAndKey(const OpId& id,
+                           RowOperationsPB::Type op_type,
+                           int32_t key,
+                           ConsensusRequestPB* req);
+
 
   string DumpToString(TServerDetails* leader,
                       const vector<string>& leader_results,
@@ -1174,6 +1179,14 @@ TEST_F(RaftConsensusITest, TestKUDU_597) {
 }
 
 void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
+  AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
+                      id.index() * 10000 + id.term(), req);
+}
+
+void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
+                                             RowOperationsPB::Type op_type,
+                                             int32_t key,
+                                             ConsensusRequestPB* req) {
   ReplicateMsg* msg = req->add_ops();
   msg->mutable_id()->CopyFrom(id);
   msg->set_timestamp(id.index());
@@ -1181,8 +1194,7 @@ void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
   WriteRequestPB* write_req = msg->mutable_write_request();
   CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
   write_req->set_tablet_id(tablet_id_);
-  int key = id.index() * 10000 + id.term();
-  AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, id.term(),
+  AddTestRowToPB(op_type, schema_, key, id.term(),
                  id.ShortDebugString(), write_req->mutable_row_operations());
 }
 
@@ -1277,6 +1289,64 @@ TEST_F(RaftConsensusITest, TestLMPMismatchOnRestartedReplica) {
   EXPECT_EQ("2.2", OpIdToString(resp.status().last_received()));
 }
 
+// Test a scenario where a replica has pending operations with lock
+// dependencies on each other:
+//   2.2: UPSERT row 1
+//   2.3: UPSERT row 1
+//   2.4: UPSERT row 1
+// ...and a new leader tries to abort 2.4 in order to replace it with a new
+// operation. Because the operations have a lock dependency, operation 2.4
+// will be 'stuck' in the Prepare queue. This verifies that we can abort an
+// operation even if it's stuck in the queue.
+TEST_F(RaftConsensusITest, TestReplaceOperationStuckInPrepareQueue) {
+  TServerDetails* replica_ts;
+  NO_FATALS(SetupSingleReplicaTest(&replica_ts));
+
+  ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(replica_ts->consensus_proxy.get());
+  ConsensusRequestPB req;
+  ConsensusResponsePB resp;
+  RpcController rpc;
+
+  req.set_tablet_id(tablet_id_);
+  req.set_dest_uuid(replica_ts->uuid());
+  req.set_caller_uuid("fake_caller");
+  req.set_caller_term(2);
+  req.set_all_replicated_index(0);
+  req.mutable_preceding_id()->CopyFrom(MakeOpId(1, 1));
+  AddOpWithTypeAndKey(MakeOpId(2, 2), RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 3), RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(2, 4), RowOperationsPB::UPSERT, 1, &req);
+  req.set_committed_index(2);
+  rpc.Reset();
+  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
+  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
+
+  // Replace operation 2.4 with 3.4, add 3.5 (upsert of a new key)
+  req.set_caller_term(3);
+  req.mutable_preceding_id()->CopyFrom(MakeOpId(2, 3));
+  req.clear_ops();
+  AddOpWithTypeAndKey(MakeOpId(3, 4), RowOperationsPB::UPSERT, 1, &req);
+  AddOpWithTypeAndKey(MakeOpId(3, 5), RowOperationsPB::UPSERT, 2, &req);
+  rpc.Reset();
+  rpc.set_timeout(MonoDelta::FromSeconds(5));
+  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
+  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
+
+  // Commit all ops.
+  req.clear_ops();
+  req.set_committed_index(5);
+  req.mutable_preceding_id()->CopyFrom(MakeOpId(3, 5));
+  rpc.Reset();
+  ASSERT_OK(c_proxy->UpdateConsensus(req, &resp, &rpc));
+  ASSERT_FALSE(resp.has_error()) << resp.DebugString();
+
+  // Ensure we can read the data.
+  vector<string> results;
+  NO_FATALS(WaitForRowCount(replica_ts->tserver_proxy.get(), 2, &results));
+  ASSERT_EQ("(int32 key=1, int32 int_val=3, string string_val=\"term: 3 index: 4\")", results[0]);
+  ASSERT_EQ("(int32 key=2, int32 int_val=3, string string_val=\"term: 3 index: 5\")", results[1]);
+}
+
 // Regression test for KUDU-644:
 // Triggers some complicated scenarios on the replica involving aborting and
 // replacing transactions.


[7/7] kudu git commit: KUDU-1782. Fault injection crashes should exit with a specific exit code

Posted by to...@apache.org.
KUDU-1782. Fault injection crashes should exit with a specific exit code

This changes the fault injection "crash" to use exit(85) instead of
LOG(FATAL). This means that we can now distinguish between a process
exiting due to fault injection compared to a process exiting due to a
crash or real FATAL-inducing issue.

The WaitForCrash method in ExternalMiniCluster::ExternalDaemon is now
split into WaitForInjectedCrash, which specifically checks that the exit
code matches the expected one, and WaitForFatal, which checks that the
exit code matches a SIGABRT. If the process exits with an unexpected
status, a bad Status is returned, typically resulting in a test failure.

I verified the new functionality by locally reverting back the fault
injection code to use LOG(FATAL) and checking that ts_recovery-itest
reported that the process had crashed unexpectedly.

Change-Id: Ic8d9ae38efa65123ae69097a76c113b9709c3484
Reviewed-on: http://gerrit.cloudera.org:8080/5353
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/80ac8bae
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/80ac8bae
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/80ac8bae

Branch: refs/heads/master
Commit: 80ac8bae335b490c7b75351e6d4c321a58183c73
Parents: f9e5993
Author: Todd Lipcon <to...@apache.org>
Authored: Sun Dec 4 00:14:37 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Dec 5 01:57:10 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/delete_table-test.cc |  3 +-
 .../integration-tests/disk_reservation-itest.cc |  4 +-
 .../integration-tests/external_mini_cluster.cc  | 42 ++++++++++++++++++--
 .../integration-tests/external_mini_cluster.h   | 27 +++++++++++--
 .../integration-tests/raft_consensus-itest.cc   |  8 ++--
 src/kudu/integration-tests/ts_recovery-itest.cc | 17 ++++----
 src/kudu/util/fault_injection.cc                | 13 ++----
 src/kudu/util/fault_injection.h                 |  7 +++-
 8 files changed, 90 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/delete_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-test.cc b/src/kudu/integration-tests/delete_table-test.cc
index 33d2e51..7ad0a3c 100644
--- a/src/kudu/integration-tests/delete_table-test.cc
+++ b/src/kudu/integration-tests/delete_table-test.cc
@@ -190,7 +190,7 @@ void DeleteTableTest::WaitForTabletDeletedOnTS(int index,
 void DeleteTableTest::WaitForTSToCrash(int index) {
   auto ts = cluster_->tablet_server(index);
   SCOPED_TRACE(ts->instance_id().permanent_uuid());
-  ASSERT_OK(ts->WaitForCrash(MonoDelta::FromSeconds(60)));
+  ASSERT_OK(ts->WaitForInjectedCrash(MonoDelta::FromSeconds(60)));
 }
 
 void DeleteTableTest::WaitForAllTSToCrash() {
@@ -491,6 +491,7 @@ TEST_F(DeleteTableTest, TestAutoTombstoneAfterCrashDuringTabletCopy) {
       "--fault_crash_after_tc_files_fetched=1.0");
 
   // Restart TS 0 and add it to the config. It will crash when tablet copy starts.
+  ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
   string leader_uuid = GetLeaderUUID(cluster_->tablet_server(1)->uuid(), replicated_tablet_id);
   TServerDetails* leader = DCHECK_NOTNULL(ts_map_[leader_uuid]);
   TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/disk_reservation-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/disk_reservation-itest.cc b/src/kudu/integration-tests/disk_reservation-itest.cc
index ae81d8c..5ca7810 100644
--- a/src/kudu/integration-tests/disk_reservation-itest.cc
+++ b/src/kudu/integration-tests/disk_reservation-itest.cc
@@ -104,7 +104,7 @@ TEST_F(DiskReservationITest, TestFillMultipleDisks) {
   // Wait for crash due to inability to flush or compact.
   Status s;
   for (int i = 0; i < 10; i++) {
-    s = cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(1));
+    s = cluster_->tablet_server(0)->WaitForFatal(MonoDelta::FromSeconds(1));
     if (s.ok()) break;
     LOG(INFO) << "Rows inserted: " << workload.rows_inserted();
   }
@@ -144,7 +144,7 @@ TEST_F(DiskReservationITest, TestWalWriteToFullDiskAborts) {
   ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(0),
                               "disk_reserved_bytes_free_for_testing", "10000001"));
 
-  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForFatal(MonoDelta::FromSeconds(10)));
   workload.StopAndJoin();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index ab69957..9bf09b3 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -40,6 +40,7 @@
 #include "kudu/util/async_util.h"
 #include "kudu/util/curl_util.h"
 #include "kudu/util/env.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/jsonreader.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/net/sockaddr.h"
@@ -710,16 +711,49 @@ bool ExternalDaemon::IsProcessAlive() const {
   return s.IsTimedOut();
 }
 
-Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout) const {
+Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const {
+  return WaitForCrash(timeout, [](int status) {
+      return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus;
+    }, "fault injection");
+}
+
+Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const {
+  return WaitForCrash(timeout, [](int status) {
+      return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT;
+    }, "FATAL crash");
+}
+
+
+Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout,
+                                    const std::function<bool(int)>& wait_status_predicate,
+                                    const char* crash_type_str) const {
+  CHECK(process_) << "process not started";
   MonoTime deadline = MonoTime::Now() + timeout;
 
   int i = 1;
-  while (MonoTime::Now() < deadline) {
-    if (!IsProcessAlive()) return Status::OK();
+  while (IsProcessAlive() && MonoTime::Now() < deadline) {
     int sleep_ms = std::min(i++ * 10, 200);
     SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
   }
-  return Status::TimedOut(Substitute("Process did not crash within $0", timeout.ToString()));
+
+  if (IsProcessAlive()) {
+    return Status::TimedOut(Substitute("Process did not crash within $0",
+                                       timeout.ToString()));
+  }
+
+  // If the process has exited, make sure it exited with the expected status.
+  int wait_status;
+  RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status),
+                        "could not get wait status");
+
+  if (!wait_status_predicate(wait_status)) {
+    string info_str;
+    RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str),
+                          "could not get description of exit");
+    return Status::Aborted(
+        Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str));
+  }
+  return Status::OK();
 }
 
 pid_t ExternalDaemon::pid() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index f1dad8c..d7b0fe2 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_INTEGRATION_TESTS_EXTERNAL_MINI_CLUSTER_H
 #define KUDU_INTEGRATION_TESTS_EXTERNAL_MINI_CLUSTER_H
 
+#include <functional>
 #include <map>
 #include <memory>
 #include <string>
@@ -348,9 +349,18 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
   // explicitly call Shutdown().
   bool IsProcessAlive() const;
 
-  // Wait for this process to crash, or the given timeout to
-  // elapse. If the process is already crashed, returns immediately.
-  Status WaitForCrash(const MonoDelta& timeout) const;
+  // Wait for this process to crash due to a configured fault
+  // injection, or the given timeout to elapse. If the process
+  // crashes for some reason other than an injected fault, returns
+  // Status::Aborted.
+  //
+  // If the process is already crashed, returns immediately.
+  Status WaitForInjectedCrash(const MonoDelta& timeout) const;
+
+  // Same as the above, but expects the process to crash due to a
+  // LOG(FATAL) or CHECK failure. In other words, waits for it to
+  // crash from SIGABRT.
+  Status WaitForFatal(const MonoDelta& timeout) const;
 
   virtual void Shutdown();
 
@@ -381,6 +391,17 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
 
   Status StartProcess(const std::vector<std::string>& user_flags);
 
+  // Wait for the process to exit, and then call 'wait_status_predicate'
+  // on the resulting exit status. NOTE: this is not the return code, but
+  // rather the value provided by waitpid(2): use WEXITSTATUS, etc.
+  //
+  // If the predicate matches, returns OK. Otherwise, returns an error.
+  // 'crash_type_str' should be a descriptive name for the type of crash,
+  // used in formatting the error message.
+  Status WaitForCrash(const MonoDelta& timeout,
+                      const std::function<bool(int)>& wait_status_predicate,
+                      const char* crash_type_str) const;
+
   // In a code-coverage build, try to flush the coverage data to disk.
   // In a non-coverage build, this does nothing.
   void FlushCoverage();

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 2bac732..995a9c4 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2797,7 +2797,7 @@ TEST_F(RaftConsensusITest, Test_KUDU_1735) {
   // change operations due to the above fault injection.
   ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[1], boost::none, kTimeout));
   for (int i = 1; i < cluster_->num_tablet_servers(); i++) {
-    ASSERT_OK(external_tservers[i]->WaitForCrash(kTimeout));
+    ASSERT_OK(external_tservers[i]->WaitForInjectedCrash(kTimeout));
   }
 
   // Delete the table, so that when we restart the crashed servers, they'll get RPCs to
@@ -2935,7 +2935,7 @@ TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
   }
   ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
   for (int i = 1; i <= 2; i++) {
-    ASSERT_OK(ext_tservers[i]->WaitForCrash(MonoDelta::FromSeconds(10)));
+    ASSERT_OK(ext_tservers[i]->WaitForFatal(MonoDelta::FromSeconds(10)));
   }
 
   // Now we know followers crash when they write to their log.
@@ -2954,7 +2954,7 @@ TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
   workload.Start();
 
   // Leader should crash as well.
-  ASSERT_OK(ext_tservers[0]->WaitForCrash(MonoDelta::FromSeconds(10)));
+  ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10)));
   workload.StopAndJoin();
 
   LOG(INFO) << "Everything crashed!";
@@ -3003,7 +3003,7 @@ TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
     ASSERT_OK(ext_tservers[i]->Restart());
   }
   // Leader will crash.
-  ASSERT_OK(ext_tservers[0]->WaitForCrash(MonoDelta::FromSeconds(10)));
+  ASSERT_OK(ext_tservers[0]->WaitForFatal(MonoDelta::FromSeconds(10)));
 }
 
 }  // namespace tserver

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index 3480cec..ec20120 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -86,7 +86,7 @@ TEST_F(TsRecoveryITest, TestRestartWithOrphanedReplicates) {
   work.Start();
 
   // Wait for the process to crash due to the injected fault.
-  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(MonoDelta::FromSeconds(30)));
 
   // Stop the writers.
   work.StopAndJoin();
@@ -133,7 +133,7 @@ TEST_F(TsRecoveryITest, TestRestartWithPendingCommitFromFailedOp) {
   work.Start();
 
   // Wait for the process to crash due to the injected fault.
-  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(MonoDelta::FromSeconds(30)));
 
   // Stop the writers.
   work.StopAndJoin();
@@ -172,11 +172,14 @@ TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
   cluster_->tablet_server(0)->Shutdown();
 
   // Restart might crash very quickly and actually return a bad status, so we
-  // ignore the result.
-  ignore_result(cluster_->tablet_server(0)->Restart());
+  // have to check the result.
+  Status s = cluster_->tablet_server(0)->Restart();
 
-  // Wait for the process to crash during log replay.
-  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
+  // Wait for the process to crash during log replay (if it didn't already crash
+  // above while we were restarting it).
+  if (s.ok()) {
+    ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(MonoDelta::FromSeconds(30)));
+  }
 
   // Now remove the crash flag, so the next replay will complete, and restart
   // the server once more.
@@ -210,7 +213,7 @@ TEST_F(TsRecoveryITest, TestCrashBeforeWriteLogSegmentHeader) {
   work.Start();
 
   // Wait for the process to crash during log roll.
-  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(60)));
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForInjectedCrash(MonoDelta::FromSeconds(60)));
   work.StopAndJoin();
 
   cluster_->tablet_server(0)->Shutdown();

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.cc b/src/kudu/util/fault_injection.cc
index db67a24..6a359a6 100644
--- a/src/kudu/util/fault_injection.cc
+++ b/src/kudu/util/fault_injection.cc
@@ -50,18 +50,13 @@ void DoMaybeFault(const char* fault_str, double fraction) {
   if (PREDICT_TRUE(g_random->NextDoubleFraction() >= fraction)) {
     return;
   }
-
-  // Disable core dumps -- it's not useful to get a core dump when we're
-  // purposefully crashing, and some tests cause lots of server crashes
-  // in a loop. This avoids filling up the disk with useless cores.
-  DisableCoreDumps();
-
-  LOG(FATAL) << "Injected fault: " << fault_str;
+  LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)";
+  exit(kExitStatus);
 }
 
-void DoInjectRandomLatency(double max_ms) {
+void DoInjectRandomLatency(double max_latency_ms) {
   GoogleOnceInit(&g_random_once, InitRandom);
-  SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_ms));
+  SleepFor(MonoDelta::FromMilliseconds(g_random->NextDoubleFraction() * max_latency_ms));
 }
 
 Status DoMaybeReturnFailure(double fraction,

http://git-wip-us.apache.org/repos/asf/kudu/blob/80ac8bae/src/kudu/util/fault_injection.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.h b/src/kudu/util/fault_injection.h
index 462908b..cc22bab 100644
--- a/src/kudu/util/fault_injection.h
+++ b/src/kudu/util/fault_injection.h
@@ -56,9 +56,14 @@
 namespace kudu {
 namespace fault_injection {
 
+// The exit status returned from a process exiting due to a fault.
+// The choice of value here is arbitrary: just needs to be something
+// wouldn't normally be returned by a non-fault-injection code path.
+constexpr int kExitStatus = 85;
+
 // Out-of-line implementation.
 void DoMaybeFault(const char* fault_str, double fraction);
-void DoInjectRandomLatency(double max_latency);
+void DoInjectRandomLatency(double max_latency_ms);
 Status DoMaybeReturnFailure(double fraction,
                             const Status& bad_status_to_return);
 


[3/7] kudu git commit: KUDU-625 - Make write transactions abort in mvcc, by default

Posted by to...@apache.org.
KUDU-625 - Make write transactions abort in mvcc, by default

This patch addresses a long standing TODO where we would commit
transactions on the WriteTransactionState dctor instead of
aborting them, if they hadn't been explicitly
committed/aborted.

Besides changing WriteTransactionState::Reset() to call
CommitOrAbort(Transaction::ABORTED) this also required a change
to TabletBootstrap to explicitly call
tx_state.CommitOrAbort(Transaction::COMMITTED) in
PlayWriteRequest().

I ran raft_consensus-itest and exactly_once_writes-itest in slow
mode, asan, for 1000 loops on dist-test. No failures.
Results:

exactly_once_writes-itest: http://dist-test.cloudera.org//job?job_id=david.alves.1480657298.6678
raft_consensus-itest     : http://dist-test.cloudera.org//job?job_id=david.alves.1480657298.6678

Change-Id: I22be773081ca3f3b1eab029e847a21e3182c64c9
Reviewed-on: http://gerrit.cloudera.org:8080/5320
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7e3071e3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7e3071e3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7e3071e3

Branch: refs/heads/master
Commit: 7e3071e3ce92aa15ea4f21d891e1e366ff421b4e
Parents: fffe576
Author: David Alves <dr...@apache.org>
Authored: Thu Dec 1 16:24:15 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sat Dec 3 01:25:35 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/tablet_bootstrap.cc               | 2 ++
 src/kudu/tablet/transactions/write_transaction.cc | 3 +--
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7e3071e3/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 55a76d2..95a167e 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1274,6 +1274,8 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
     tx_state.ReleaseTxResultPB(commit->mutable_result());
   }
 
+  tx_state.CommitOrAbort(Transaction::COMMITTED);
+
   RETURN_NOT_OK(log_->Append(&commit_entry));
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7e3071e3/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index 6387fb1..d2207fc 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -325,8 +325,7 @@ WriteTransactionState::~WriteTransactionState() {
 }
 
 void WriteTransactionState::Reset() {
-  // We likely shouldn't Commit() here. See KUDU-625.
-  CommitOrAbort(Transaction::COMMITTED);
+  CommitOrAbort(Transaction::ABORTED);
   tx_metrics_.Reset();
   timestamp_ = Timestamp::kInvalidTimestamp;
   tablet_components_ = nullptr;


[5/7] kudu git commit: Add integration tests for duplicate keys

Posted by to...@apache.org.
Add integration tests for duplicate keys

This splits the "crashy nodes" and "churny elections" test
of raft_consensus-itest into unique and duplicate key variants.
This change is meant to stress any possible deadlock scenarios
related to transaction commit/abort and 2-phase locking for
which we didn't have much coverage.

Additionally this also disallows timeouts on writes and requires
an exact count of the rows at the end. This is now possible
due to exactly once semantics.

Finally this changes the cluster verifier to use snapshot scans
and changes the timeout of another scan in that test. These
two changes deflaked the test from 27/1000 to 3/1000 with
asan, slow mode, and 1 stress thread (any more and the test
becomes much more flaky, as before). Of the 3 failures two
are unrelated (inability to start the webserver and a timeout
on another, unrelated test). The one failure that is related
to this patch is a snapshot scan anomaly and should be solved
by the safe time patches.

The coverage of these kinds of workloads is now much better.
An example of that is that revision 12 of [1] caused a deadlock
while aborting transactions out of order.  Looping
raft_consensus-itest in slow mode, asan, 1 stress thread would
previously not fail with the buggy code and now it fails
127/1000.

Results:
With master (3/1000 failures):
http://dist-test.cloudera.org//job?job_id=david.alves.1480767369.16205
With buggy patch[1] (127/1000 failures):
http://dist-test.cloudera.org//job?job_id=david.alves.1480805849.13039

[1] - https://gerrit.cloudera.org/#/c/5294/12

Change-Id: I6d018281d412ae034bd7b70c8311077a52b2795d
Reviewed-on: http://gerrit.cloudera.org:8080/5349
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/02a96c79
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/02a96c79
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/02a96c79

Branch: refs/heads/master
Commit: 02a96c7904801ecd8be088f44863d68fb9989fb2
Parents: 4457948
Author: David Alves <dr...@apache.org>
Authored: Fri Dec 2 20:23:21 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sun Dec 4 05:33:18 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/cluster_verifier.cc  |   5 +-
 .../integration-tests/raft_consensus-itest.cc   | 196 ++++++++++++-------
 2 files changed, 126 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/02a96c79/src/kudu/integration-tests/cluster_verifier.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_verifier.cc b/src/kudu/integration-tests/cluster_verifier.cc
index fd448d6..d027ba5 100644
--- a/src/kudu/integration-tests/cluster_verifier.cc
+++ b/src/kudu/integration-tests/cluster_verifier.cc
@@ -123,7 +123,10 @@ Status ClusterVerifier::DoCheckRowCount(const std::string& table_name,
   RETURN_NOT_OK_PREPEND(client->OpenTable(table_name, &table),
                         "Unable to open table");
   client::KuduScanner scanner(table.get());
-  CHECK_OK(scanner.SetProjectedColumns(vector<string>()));
+  CHECK_OK(scanner.SetReadMode(client::KuduScanner::READ_AT_SNAPSHOT));
+  CHECK_OK(scanner.SetFaultTolerant());
+  CHECK_OK(scanner.SetTimeoutMillis(5000));
+  CHECK_OK(scanner.SetProjectedColumns({}));
   RETURN_NOT_OK_PREPEND(scanner.Open(), "Unable to open scanner");
   int count = 0;
   vector<client::KuduRowResult> rows;

http://git-wip-us.apache.org/repos/asf/kudu/blob/02a96c79/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 8d9c8d3..2bac732 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -356,9 +356,10 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
                                        bool* has_leader,
                                        master::TabletLocationsPB* tablet_locations);
 
-  static const bool WITH_NOTIFICATION_LATENCY = true;
-  static const bool WITHOUT_NOTIFICATION_LATENCY = false;
-  void DoTestChurnyElections(bool with_latency);
+  void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
+  void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
+  void CreateClusterForCrashyNodesTests();
+  void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
 
   // Prepare for a test where a single replica of a 3-server cluster is left
   // running as a follower.
@@ -830,23 +831,13 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
   }
 }
 
-// This test starts several tablet servers, and configures them with
-// fault injection so that the leaders frequently crash just before
-// sending RPCs to followers.
-//
-// This can result in various scenarios where leaders crash right after
-// being elected and never succeed in replicating their first operation.
-// For example, KUDU-783 reproduces from this test approximately 5% of the
-// time on a slow-test debug build.
-TEST_F(RaftConsensusITest, InsertWithCrashyNodes) {
-  int kCrashesToCause = 3;
+void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
   if (AllowSlowTests()) {
     FLAGS_num_tablet_servers = 7;
     FLAGS_num_replicas = 7;
-    kCrashesToCause = 15;
   }
 
-  vector<string> ts_flags, master_flags;
+  vector<string> ts_flags;
 
   // Crash 5% of the time just before sending an RPC. With 7 servers,
   // this means we crash about 30% of the time before we've fully
@@ -859,8 +850,7 @@ TEST_F(RaftConsensusITest, InsertWithCrashyNodes) {
   ts_flags.push_back("--log_inject_latency_ms_mean=30");
   ts_flags.push_back("--log_inject_latency_ms_stddev=60");
 
-  // Make leader elections faster so we get through more cycles of
-  // leaders.
+  // Make leader elections faster so we get through more cycles of leaders.
   ts_flags.push_back("--raft_heartbeat_interval_ms=100");
   ts_flags.push_back("--leader_failure_monitor_check_mean_ms=50");
   ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=25");
@@ -870,28 +860,33 @@ TEST_F(RaftConsensusITest, InsertWithCrashyNodes) {
   // log area.
   ts_flags.push_back("--log_preallocate_segments=false");
 
-  CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags);
+  CreateCluster("raft_consensus-itest-crashy-nodes-cluster", ts_flags, {});
+}
 
-  TestWorkload workload(cluster_.get());
-  workload.set_num_replicas(FLAGS_num_replicas);
-  workload.set_timeout_allowed(true);
-  workload.set_write_timeout_millis(10000);
-  workload.set_num_write_threads(10);
-  workload.set_write_batch_size(1);
-  workload.Setup();
-  workload.Start();
+void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
+  int crashes_to_cause = 3;
+  if (AllowSlowTests()) {
+    crashes_to_cause = 15;
+  }
+
+  workload->set_num_replicas(FLAGS_num_replicas);
+  // Set a really high write timeout so that even in the presence of many failures we
+  // can verify an exact number of rows in the end, thanks to exactly once semantics.
+  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+  workload->set_num_write_threads(10);
+  workload->Setup();
+  workload->Start();
 
   int num_crashes = 0;
-  while (num_crashes < kCrashesToCause &&
-         workload.rows_inserted() < 100) {
+  while (num_crashes < crashes_to_cause &&
+      workload->rows_inserted() < max_rows_to_insert) {
     num_crashes += RestartAnyCrashedTabletServers();
     SleepFor(MonoDelta::FromMilliseconds(10));
   }
 
-  workload.StopAndJoin();
-
-  // After we stop the writes, we can still get crashes because heartbeats could
-  // trigger the fault path. So, disable the faults and restart one more time.
+  // Writers are likely ongoing. To have some chance of completing all writes,
+  // restart the tablets servers, otherwise they'll keep crashing and the writes
+  // can never complete.
   for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
     ExternalTabletServer* ts = cluster_->tablet_server(i);
     vector<string>* flags = ts->mutable_flags();
@@ -906,35 +901,54 @@ TEST_F(RaftConsensusITest, InsertWithCrashyNodes) {
     ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
                               << "\nFlags:\n" << *flags;
     ts->Shutdown();
-    CHECK_OK(ts->Restart());
+    ASSERT_OK(ts->Restart());
   }
 
+  workload->StopAndJoin();
+
   // Ensure that the replicas converge.
-  // We don't know exactly how many rows got inserted, since the writer
-  // probably saw many errors which left inserts in indeterminate state.
-  // But, we should have at least as many as we got confirmation for.
   ClusterVerifier v(cluster_.get());
   NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
-                            workload.rows_inserted()));
+  NO_FATALS(v.CheckRowCount(workload->table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload->rows_inserted()));
 }
 
-// This test sets all of the election timers to be very short, resulting
-// in a lot of churn. We expect to make some progress and not diverge or
-// crash, despite the frequent re-elections and races.
-TEST_F(RaftConsensusITest, TestChurnyElections) {
-  DoTestChurnyElections(WITHOUT_NOTIFICATION_LATENCY);
+// This test starts several tablet servers, and configures them with
+// fault injection so that the leaders frequently crash just before
+// sending RPCs to followers.
+//
+// This can result in various scenarios where leaders crash right after
+// being elected and never succeed in replicating their first operation.
+// For example, KUDU-783 reproduces from this test approximately 5% of the
+// time on a slow-test debug build.
+TEST_F(RaftConsensusITest, InsertUniqueKeysWithCrashyNodes) {
+  CreateClusterForCrashyNodesTests();
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+
+  NO_FATALS(DoTestCrashyNodes(&workload, 100));
 }
 
-// The same test, except inject artificial latency when propagating notifications
-// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
-// normally only appear under high load.
-TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
-  DoTestChurnyElections(WITH_NOTIFICATION_LATENCY);
+// The same crashy nodes test as above but inserts many duplicate keys.
+// This emulates cases where there are many duplicate keys which, due to two phase
+// locking, may cause deadlocks and other anomalies that cannot be observed when
+// keys are unique.
+TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
+  CreateClusterForCrashyNodesTests();
+
+  TestWorkload workload(cluster_.get());
+  workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+  // Increase the number of rows per batch to get a higher chance of key collision.
+  workload.set_write_batch_size(3);
+
+  NO_FATALS(DoTestCrashyNodes(&workload, 300));
 }
 
-void RaftConsensusITest::DoTestChurnyElections(bool with_latency) {
-  vector<string> ts_flags, master_flags;
+void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
+    const vector<string>& extra_ts_flags) {
+  vector<string> ts_flags;
 
 #ifdef THREAD_SANITIZER
   // On TSAN builds, we need to be a little bit less churny in order to make
@@ -946,46 +960,80 @@ void RaftConsensusITest::DoTestChurnyElections(bool with_latency) {
   ts_flags.push_back("--leader_failure_monitor_check_mean_ms=1");
   ts_flags.push_back("--leader_failure_monitor_check_stddev_ms=1");
   ts_flags.push_back("--never_fsync");
-  if (with_latency) {
-    ts_flags.push_back("--consensus_inject_latency_ms_in_notifications=50");
-  }
+  ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
 
-  CreateCluster("raft_consensus-itest-cluster", ts_flags, master_flags);
+  CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
+}
 
-  TestWorkload workload(cluster_.get());
-  workload.set_num_replicas(FLAGS_num_replicas);
-  workload.set_timeout_allowed(true);
-  workload.set_write_timeout_millis(100);
-  workload.set_num_write_threads(2);
-  workload.set_write_batch_size(1);
-  workload.Setup();
-  workload.Start();
+void RaftConsensusITest::DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert) {
+  workload->set_num_replicas(FLAGS_num_replicas);
+  // Set a really high write timeout so that even in the presence of many failures we
+  // can verify an exact number of rows in the end, thanks to exactly once semantics.
+  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+  workload->set_num_write_threads(2);
+  workload->set_write_batch_size(1);
+  workload->Setup();
+  workload->Start();
 
   // Run for either a prescribed number of writes, or 30 seconds,
   // whichever comes first. This prevents test timeouts on slower
   // build machines, TSAN builds, etc.
   Stopwatch sw;
   sw.start();
-  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
-  while (workload.rows_inserted() < kNumWrites &&
-         sw.elapsed().wall_seconds() < 30) {
+  while (workload->rows_inserted() < max_rows_to_insert &&
+      sw.elapsed().wall_seconds() < 30) {
     SleepFor(MonoDelta::FromMilliseconds(10));
-    NO_FATALS(AssertNoTabletServersCrashed());
+        NO_FATALS(AssertNoTabletServersCrashed());
   }
-  workload.StopAndJoin();
-  ASSERT_GT(workload.rows_inserted(), 0) << "No rows inserted";
+  workload->StopAndJoin();
+  ASSERT_GT(workload->rows_inserted(), 0) << "No rows inserted";
 
   // Ensure that the replicas converge.
-  // We don't know exactly how many rows got inserted, since the writer
-  // probably saw many errors which left inserts in indeterminate state.
-  // But, we should have at least as many as we got confirmation for.
+  // We expect an exact result due to exactly once semantics and snapshot scans.
   ClusterVerifier v(cluster_.get());
   NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
-                            workload.rows_inserted()));
+  NO_FATALS(v.CheckRowCount(workload->table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload->rows_inserted()));
   NO_FATALS(AssertNoTabletServersCrashed());
 }
 
+// This test sets all of the election timers to be very short, resulting
+// in a lot of churn. We expect to make some progress and not diverge or
+// crash, despite the frequent re-elections and races.
+TEST_F(RaftConsensusITest, TestChurnyElections) {
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  CreateClusterForChurnyElectionsTests({});
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same test, except inject artificial latency when propagating notifications
+// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
+// normally only appear under high load.
+TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
+  CreateClusterForChurnyElectionsTests({"--consensus_inject_latency_ms_in_notifications=50"});
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same as TestChurnyElections except insert many duplicated rows.
+// This emulates cases where there are many duplicate keys which, due to two phase
+// locking, may cause deadlocks and other anomalies that cannot be observed when
+// keys are unique.
+TEST_F(RaftConsensusITest, TestChurnyElections_WithDuplicateKeys) {
+  CreateClusterForChurnyElectionsTests({});
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  TestWorkload workload(cluster_.get());
+  workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+  // Increase the number of rows per batch to get a higher chance of key collision.
+  workload.set_write_batch_size(3);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
 TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   int kNumElections = FLAGS_num_replicas;
 
@@ -1472,7 +1520,7 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
     ScanRequestPB req;
     ScanResponsePB resp;
     RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
+    rpc.set_timeout(MonoDelta::FromMilliseconds(5000));
     NewScanRequestPB* scan = req.mutable_new_scan_request();
     scan->set_tablet_id(tablet_id_);
     scan->set_read_mode(READ_AT_SNAPSHOT);


[6/7] kudu git commit: KUDU-798 (part 3) Replica transactions must start/abort on the consensus update thread

Posted by to...@apache.org.
KUDU-798 (part 3) Replica transactions must start/abort on the consensus update thread

In order for a consensus replica to safely move "safe" time, it needs
to know that all transactions that come before it have at least
started. One way to do this is to call transaction_->Start() on
transaction_driver_->Init(). We know Init() is called by the thread
updating consensus, for non-leader transactions. This should be ok as
the leader has already correctly serialized the transactions.

However, if transactions are now started on Init() then, in the case
they abort (i.e. when transaction_driver_->ReplicationFinished() is
called with a non-ok status, again done by the thread updating
consensus), we must make sure that the transaction is actually
removed from mvcc before that method returns. Otherwise consensus
might call Start() on another transaction with the same timestamp
before the failed transaction is removed from mvcc.

To do this this patch adds a way to release only the mvcc txn,
since its the only thing we care about and Prepare() might still
be acquiring row locks.

I ran the new raft_consensus-itest that includes unique/duplicate
key workloads and exactly once semantics in dist-test.

Results of slow mode, 1 stress thread, 1000 loop runs of
raft_consensus-itest on dist-test.

TSAN-prev (16/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480814757.31031
TSAN-this (10/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480812541.27015

I inspected the TSAN failures and they were mostly ~DnsResolver()
races on test code itself. The tests themselves passed for the
most part. Some ended up timing out with archives that are too
big to download. This also happens in the previous patch.

ASAN-prev (02/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480817418.1140
ASAN-this (03/1000 failures): http://dist-test.cloudera.org//job?job_id=david.alves.1480811163.22558

I inspected the ASAN failures. Two of them are test only flakes.
One of them is worrying as the consensus queue remains full,
suggesting a deadlock, but this also happening on the previous
patch. Example:

https://kudu-test-results.s3.amazonaws.com/david.alves.1480817418.1140.36139843995986a54aaf9f533a00111f384e85a6.145.0-artifacts.zip?Signature=PjjGYVvmWns4t0M7j6%2FAZQYVj34%3D&Expires=1480904781&AWSAccessKeyId=AKIAJ2NR2VXMAHTVLMRA

Change-Id: Ie360e597eea86551c453717d7a1a000848027f4c
Reviewed-on: http://gerrit.cloudera.org:8080/5294
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f9e5993c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f9e5993c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f9e5993c

Branch: refs/heads/master
Commit: f9e5993c9390d56700d7ae29d418035b152f6858
Parents: 02a96c7
Author: David Alves <dr...@apache.org>
Authored: Wed Nov 30 16:28:14 2016 -0800
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Sun Dec 4 05:33:32 2016 +0000

----------------------------------------------------------------------
 src/kudu/tablet/transactions/transaction.h      |  3 ++
 .../tablet/transactions/transaction_driver.cc   | 37 +++++++++++++-------
 .../tablet/transactions/transaction_driver.h    | 25 ++++++-------
 .../tablet/transactions/write_transaction.cc    | 26 +++++++++-----
 .../tablet/transactions/write_transaction.h     |  9 ++---
 5 files changed, 60 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 29fcbee..4d8b4d5 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -96,6 +96,9 @@ class Transaction {
   // side-effects.
   virtual Status Prepare() = 0;
 
+  // Aborts the prepare phase.
+  virtual void AbortPrepare() {}
+
   // Actually starts a transaction, assigning a timestamp to the transaction.
   // LEADER replicas execute this in or right after Prepare(), while FOLLOWER/LEARNER
   // replicas execute this right before the Apply() phase as the transaction's

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 23beaad..ab0606d 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -114,6 +114,13 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
     DCHECK(op_id_copy_.IsInitialized());
     replication_state_ = REPLICATING;
     replication_start_time_ = MonoTime::Now();
+    // Start the replica transaction in the thread that is updating consensus, for non-leader
+    // transactions.
+    // Replica transactions were already assigned a timestamp so we don't need to acquire locks
+    // before calling Start(). Starting the the transaction here gives a strong guarantee
+    // to consensus that the transaction is on mvcc when it moves "safe" time so that we don't
+    // risk marking a timestamp "safe" before all transactions before it are in-flight are on mvcc.
+    RETURN_NOT_OK(transaction_->Start());
     if (state()->are_results_tracked()) {
       // If this is a follower transaction, make sure to set the transaction completion callback
       // before the transaction has a chance to fail.
@@ -138,7 +145,6 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
   }
 
   RETURN_NOT_OK(txn_tracker_->Add(this));
-
   return Status::OK();
 }
 
@@ -189,7 +195,7 @@ Status TransactionDriver::ExecuteAsync() {
 
   if (s.ok()) {
     s = prepare_pool_->SubmitClosure(
-      Bind(&TransactionDriver::PrepareAndStartTask, Unretained(this)));
+      Bind(&TransactionDriver::PrepareTask, Unretained(this)));
   }
 
   if (!s.ok()) {
@@ -200,9 +206,9 @@ Status TransactionDriver::ExecuteAsync() {
   return Status::OK();
 }
 
-void TransactionDriver::PrepareAndStartTask() {
-  TRACE_EVENT_FLOW_END0("txn", "PrepareAndStartTask", this);
-  Status prepare_status = PrepareAndStart();
+void TransactionDriver::PrepareTask() {
+  TRACE_EVENT_FLOW_END0("txn", "PrepareTask", this);
+  Status prepare_status = Prepare();
   if (PREDICT_FALSE(!prepare_status.ok())) {
     HandleFailure(prepare_status);
   }
@@ -236,9 +242,10 @@ void TransactionDriver::RegisterFollowerTransactionOnResultTracker() {
   }
 }
 
-Status TransactionDriver::PrepareAndStart() {
-  TRACE_EVENT1("txn", "PrepareAndStart", "txn", this);
-  VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
+Status TransactionDriver::Prepare() {
+  TRACE_EVENT1("txn", "Prepare", "txn", this);
+  VLOG_WITH_PREFIX(4) << "Prepare()";
+
   // Actually prepare and start the transaction.
   prepare_physical_timestamp_ = GetMonoTimeMicros();
 
@@ -260,7 +267,6 @@ Status TransactionDriver::PrepareAndStart() {
     // preempted after the state is prepared apply can be triggered by another thread without the
     // rpc being registered.
     if (transaction_->type() == consensus::REPLICA) {
-      RETURN_NOT_OK(transaction_->Start());
       RegisterFollowerTransactionOnResultTracker();
     // ... else we're a client-started transaction. Make sure we're still the driver of the
     // RPC and give up if we aren't.
@@ -295,9 +301,8 @@ Status TransactionDriver::PrepareAndStart() {
           transaction_->state()->timestamp().ToUint64());
 
       RETURN_NOT_OK(transaction_->Start());
-
-      VLOG_WITH_PREFIX(4) << "Triggering consensus repl";
-      // Trigger the consensus replication.
+      VLOG_WITH_PREFIX(4) << "Triggering consensus replication.";
+      // Trigger consensus replication.
       {
         std::lock_guard<simple_spinlock> lock(lock_);
         replication_state_ = REPLICATING;
@@ -384,6 +389,13 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
     mutable_state()->mutable_op_id()->CopyFrom(op_id_copy_);
   }
 
+  // If we're going to abort, do so before changing the state below. This avoids a race with
+  // the prepare thread, which would race with the thread calling this method to release the driver
+  // while we're aborting, if we were to do it afterwards.
+  if (!status.ok()) {
+    transaction_->AbortPrepare();
+  }
+
   MonoDelta replication_duration;
   PrepareState prepare_state_copy;
   {
@@ -399,7 +411,6 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
     replication_duration = replication_finished_time - replication_start_time_;
   }
 
-
   TRACE_COUNTER_INCREMENT("replication_time_us", replication_duration.ToMicroseconds());
 
   // If we have prepared and replicated, we're ready

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 1072703..df62a20 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -53,22 +53,23 @@ class TransactionTracker;
 //  1 - Init() is called on a newly created driver object.
 //      If the driver is instantiated from a REPLICA, then we know that
 //      the operation is already "REPLICATING" (and thus we don't need to
-//      trigger replication ourself later on).
+//      trigger replication ourself later on). In this case the transaction has already
+//      been serialized by the leader, so we also call transaction_->Start().
 //
-//  2 - ExecuteAsync() is called. This submits PrepareAndStartTask() to prepare_pool_
+//  2 - ExecuteAsync() is called. This submits PrepareTask() to prepare_pool_
 //      and returns immediately.
 //
-//  3 - PrepareAndStartTask() calls Prepare() and Start() on the transaction.
+//  3 - PrepareTask() calls Prepare() on the transaction.
 //
 //      Once successfully prepared, if we have not yet replicated (i.e we are leader),
-//      also triggers consensus->Replicate() and changes the replication state to
-//      REPLICATING.
+//      assign a timestamp and call transaction_->Start(). Finally call consensus->Replicate()
+//      and change the replication state to REPLICATING.
 //
 //      On the other hand, if we have already successfully replicated (eg we are the
-//      follower and ConsensusCommitted() has already been called, then we can move
+//      follower and ReplicationFinished() has already been called, then we can move
 //      on to ApplyAsync().
 //
-//  4 - The Consensus implementation calls ConsensusCommitted()
+//  4 - The Consensus implementation calls ReplicationFinished()
 //
 //      This is triggered by consensus when the commit index moves past our own
 //      OpId. On followers, this can happen before Prepare() finishes, and thus
@@ -297,12 +298,12 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
 
   ~TransactionDriver() {}
 
-  // The task submitted to the prepare threadpool to prepare and start
-  // the transaction. If PrepareAndStart() fails, calls HandleFailure.
-  void PrepareAndStartTask();
+  // The task submitted to the prepare threadpool to prepare the transaction. If Prepare() fails,
+  // calls HandleFailure.
+  void PrepareTask();
 
-  // Actually prepare and start.
-  Status PrepareAndStart();
+  // Actually prepare.
+  Status Prepare();
 
   // Submits ApplyTask to the apply pool.
   Status ApplyAsync();

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index d2207fc..d7dce19 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -101,6 +101,10 @@ Status WriteTransaction::Prepare() {
   return Status::OK();
 }
 
+void WriteTransaction::AbortPrepare() {
+  state()->ReleaseMvccTxn(TransactionResult::ABORTED);
+}
+
 Status WriteTransaction::Start() {
   TRACE_EVENT0("txn", "WriteTransaction::Start");
   TRACE("Start()");
@@ -255,6 +259,19 @@ void WriteTransactionState::StartApplying() {
 }
 
 void WriteTransactionState::CommitOrAbort(Transaction::TransactionResult result) {
+  ReleaseMvccTxn(result);
+
+  TRACE("Releasing row and schema locks");
+  ReleaseRowLocks();
+  ReleaseSchemaLock();
+
+  // After committing, if there is an RPC going on, the driver will respond to it.
+  // That will delete the RPC request and response objects. So, NULL them here
+  // so we don't read them again after they're deleted.
+  ResetRpcFields();
+}
+
+void WriteTransactionState::ReleaseMvccTxn(Transaction::TransactionResult result) {
   if (mvcc_tx_.get() != nullptr) {
     // Commit the transaction.
     switch (result) {
@@ -267,15 +284,6 @@ void WriteTransactionState::CommitOrAbort(Transaction::TransactionResult result)
     }
   }
   mvcc_tx_.reset();
-
-  TRACE("Releasing row and schema locks");
-  ReleaseRowLocks();
-  ReleaseSchemaLock();
-
-  // After committing, if there is an RPC going on, the driver will respond to it.
-  // That will delete the RPC request and response objects. So, NULL them here
-  // so we don't read them again after they're deleted.
-  ResetRpcFields();
 }
 
 void WriteTransactionState::ReleaseTxResultPB(TxResultPB* result) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/f9e5993c/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 05f84ff..5cb8f23 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -120,6 +120,7 @@ class WriteTransactionState : public TransactionState {
   // Release the already-acquired schema lock.
   void ReleaseSchemaLock();
 
+  void ReleaseMvccTxn(Transaction::TransactionResult result);
 
   void set_schema_at_decode_time(const Schema* schema) {
     std::lock_guard<simple_spinlock> l(txn_state_lock_);
@@ -152,12 +153,6 @@ class WriteTransactionState : public TransactionState {
   // Note: request_ and response_ are set to NULL after this method returns.
   void CommitOrAbort(Transaction::TransactionResult result);
 
-  // Aborts the mvcc transaction and releases the component lock.
-  // Only one of Commit() or Abort() should be called.
-  //
-  // REQUIRES: StartApplying() must never have been called.
-  void Abort();
-
   // Returns all the prepared row writes for this transaction. Usually called
   // on the apply phase to actually make changes to the tablet.
   const std::vector<RowOp*>& row_ops() const {
@@ -236,6 +231,8 @@ class WriteTransaction : public Transaction {
   // into the WriteTransactionState.
   virtual Status Prepare() OVERRIDE;
 
+  virtual void AbortPrepare() OVERRIDE;
+
   // Actually starts the Mvcc transaction and assigns a timestamp to this transaction.
   virtual Status Start() OVERRIDE;