You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/06 01:54:21 UTC

kudu git commit: KUDU-1622. result_tracker: respond to RPCs outside of the lock

Repository: kudu
Updated Branches:
  refs/heads/master 44c73d5b6 -> e98e4b6d8


KUDU-1622. result_tracker: respond to RPCs outside of the lock

This patch started with the goal of addressing the flakiness of
ExactlyOnceRpcTest.TestExactlyOnceSemanticsAfterRpcCompleted. The issue
was that the MemTracker updates resulting from an RPC response were
occurring asynchronously after the RPC was responded, and in two
separate increments. Thus, in this test, there was one scenario in which
the test would record a partial update and then in a later assertion not
match the earlier recorded value.

In order to fix this, I ended up at a solution which both fixed the
flakiness and reduced lock contention in ResultTracker. We now respond
to RPCs after we've done the required MemTracker updates, which also
means that we respond to RPCs after dropping the ResultTracker locks.
This has been a high contention point in stress tests, so this is a nice
benefit.

Fixing the flakiness also meant that other 'AssertEventually'
workarounds from this test case could now be removed.

I looped ExactlyOnceRpcTest.TestExactlyOnceSemanticsAfterRpcCompleted
500 times with 4 stress threads and it passed 100% of the time[1]. Prior
to this patch, it would reliably fail at least a couple runs out of
500[2].

I also looped exactly_once_writes-itest 500 times with this patch
with 100% success[3].

[1] http://dist-test.cloudera.org//job?job_id=todd.1480928777.29435
[2] http://dist-test.cloudera.org//job?job_id=todd.1480929041.29742
[3] http://dist-test.cloudera.org//job?job_id=todd.1480929367.31010

Change-Id: I065b9e40bc1af81e0871220a0a01461ea35143b5
Reviewed-on: http://gerrit.cloudera.org:8080/5359
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e98e4b6d
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e98e4b6d
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e98e4b6d

Branch: refs/heads/master
Commit: e98e4b6d826c8a76d532e85e254981ba5298e991
Parents: 44c73d5
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Dec 5 17:05:21 2016 +0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Dec 6 01:53:54 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/exactly_once_rpc-test.cc |  20 +---
 src/kudu/rpc/result_tracker.cc        | 175 ++++++++++++++++-------------
 2 files changed, 102 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e98e4b6d/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index fc1f07f..097e838 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -355,13 +355,8 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
     // plus some fixed overhead for the client-tracking structure.
     int expected_incremental_usage = original_resp.SpaceUsed() + 200;
 
-    // The consumption isn't immediately updated, since the MemTracker update
-    // happens after we call 'Respond' on the RPC.
-    int mem_consumption_after;
-    AssertEventually([&]() {
-        mem_consumption_after = mem_tracker_->consumption();
-        ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
-      });
+    int mem_consumption_after = mem_tracker_->consumption();
+    ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
     mem_consumption = mem_consumption_after;
   }
 
@@ -396,9 +391,7 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
 
     // Send the first request for this new client.
     ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
-    AssertEventually([&]() {
-        ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
-      });
+    ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
   }
 }
 
@@ -478,14 +471,11 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
       }
     }
 
-    // Wait for the MemTracker to be updated.
     // After all adders finished we should at least the size of one more response.
     // The actual size depends of multiple factors, for instance, how many calls were "attached"
     // (which is timing dependent) so we can't be more precise than this.
-    AssertEventually([&]() {
-        ASSERT_GT(mem_tracker_->consumption(),
-                  memory_consumption_initial + single_response_size * i);
-      });
+    ASSERT_GT(mem_tracker_->consumption(),
+              memory_consumption_initial + single_response_size * i);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e98e4b6d/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index 0f22d66..f7ac154 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -297,99 +297,118 @@ ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id)
 
 void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
                                                const Message* response) {
-  lock_guard<simple_spinlock> l(lock_);
-
-  CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
-  ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
-
-  CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
-    << "Called RecordCompletionAndRespond() from an executor identified with an attempt number that"
-    << " was not marked as the driver for the RPC. RequestId: " << request_id.ShortDebugString()
-    << "\nTracker state:\n " << ToStringUnlocked();
-  DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
-  completion_record->response.reset(DCHECK_NOTNULL(response)->New());
-  completion_record->response->CopyFrom(*response);
-  completion_record->state = RpcState::COMPLETED;
-  completion_record->last_updated = MonoTime::Now();
-
-  CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
-
-  int64_t handler_attempt_no = request_id.attempt_no();
-
-  // Go through the ongoing RPCs and reply to each one.
-  for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
-       orpc_iter != completion_record->ongoing_rpcs.rend();) {
-
-    const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
-    if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
-      if (ongoing_rpc.context != nullptr) {
-        if (PREDICT_FALSE(ongoing_rpc.response != response)) {
-          ongoing_rpc.response->CopyFrom(*completion_record->response);
+  vector<OnGoingRpcInfo> to_respond;
+  {
+    lock_guard<simple_spinlock> l(lock_);
+
+    CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+    ScopedMemTrackerUpdater<CompletionRecord> updater(mem_tracker_.get(), completion_record);
+
+    CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
+        << "Called RecordCompletionAndRespond() from an executor identified with an "
+        << "attempt number that was not marked as the driver for the RPC. RequestId: "
+        << request_id.ShortDebugString() << "\nTracker state:\n " << ToStringUnlocked();
+    DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
+    completion_record->response.reset(DCHECK_NOTNULL(response)->New());
+    completion_record->response->CopyFrom(*response);
+    completion_record->state = RpcState::COMPLETED;
+    completion_record->last_updated = MonoTime::Now();
+
+    CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
+
+    int64_t handler_attempt_no = request_id.attempt_no();
+
+    // Go through the ongoing RPCs and reply to each one.
+    for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+         orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+      const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+      if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+        if (ongoing_rpc.context != nullptr) {
+          to_respond.push_back(ongoing_rpc);
         }
-        LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response);
+        ++orpc_iter;
+        orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+            completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+      } else {
+        ++orpc_iter;
       }
-      ++orpc_iter;
-      orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
-          completion_record->ongoing_rpcs.erase(orpc_iter.base()));
-    } else {
-      ++orpc_iter;
     }
   }
+
+  // Respond outside of holding the lock. This reduces lock contention and also
+  // means that we will have fully updated our memory tracking before responding,
+  // which makes testing easier.
+  for (auto& ongoing_rpc : to_respond) {
+    if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+      ongoing_rpc.response->CopyFrom(*response);
+    }
+    LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response);
+  }
 }
 
 void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
                                            HandleOngoingRpcFunc func) {
-  lock_guard<simple_spinlock> l(lock_);
-  auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
-  if (PREDICT_FALSE(state_and_record.first == nullptr)) {
-    LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
-        << ". \nTracker state:\n" << ToStringUnlocked();
-  }
-
-  CompletionRecord* completion_record = state_and_record.second;
-
-  // It is possible for this method to be called for an RPC that was never actually tracked (though
-  // RecordCompletionAndRespond() can't). One such case is when a follower transaction fails
-  // on the TransactionManager, for some reason, before it was tracked. The CompletionCallback still
-  // calls this method. In this case, do nothing.
-  if (completion_record == nullptr) {
-    return;
-  }
-
-  ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
-  completion_record->last_updated = MonoTime::Now();
+  vector<OnGoingRpcInfo> to_handle;
+  {
+    lock_guard<simple_spinlock> l(lock_);
+    auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+    if (PREDICT_FALSE(state_and_record.first == nullptr)) {
+      LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
+                 << ". \nTracker state:\n" << ToStringUnlocked();
+    }
 
-  int64_t seq_no = request_id.seq_no();
-  int64_t handler_attempt_no = request_id.attempt_no();
+    CompletionRecord* completion_record = state_and_record.second;
 
-  // If we're copying from a client originated response we need to take care to reply
-  // to that call last, otherwise we'll lose 'response', before we go through all the
-  // CompletionRecords.
-  for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
-       orpc_iter != completion_record->ongoing_rpcs.rend();) {
+    // It is possible for this method to be called for an RPC that was never actually
+    // tracked (though RecordCompletionAndRespond() can't). One such case is when a
+    // follower transaction fails on the TransactionManager, for some reason, before it
+    // was tracked. The CompletionCallback still calls this method. In this case, do
+    // nothing.
+    if (completion_record == nullptr) {
+      return;
+    }
 
-    const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
-    if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
-      if (ongoing_rpc.context != nullptr) {
-        func(ongoing_rpc);
-        delete ongoing_rpc.context;
+    ScopedMemTrackerUpdater<CompletionRecord> cr_updater(mem_tracker_.get(), completion_record);
+    completion_record->last_updated = MonoTime::Now();
+
+    int64_t seq_no = request_id.seq_no();
+    int64_t handler_attempt_no = request_id.attempt_no();
+
+    // If we're copying from a client originated response we need to take care to reply
+    // to that call last, otherwise we'll lose 'response', before we go through all the
+    // CompletionRecords.
+    for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+         orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+      const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+      if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+        to_handle.push_back(ongoing_rpc);
+        ++orpc_iter;
+        orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+            completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+      } else {
+        ++orpc_iter;
       }
-      ++orpc_iter;
-      orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
-          completion_record->ongoing_rpcs.erase(orpc_iter.base()));
-    } else {
-      ++orpc_iter;
+    }
+
+    // If we're the last ones trying this and the state is not completed,
+    // delete the completion record.
+    if (completion_record->ongoing_rpcs.size() == 0
+        && completion_record->state != RpcState::COMPLETED) {
+      cr_updater.Cancel();
+      unique_ptr<CompletionRecord> completion_record =
+          EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
+      mem_tracker_->Release(completion_record->memory_footprint());
     }
   }
 
-  // If we're the last ones trying this and the state is not completed,
-  // delete the completion record.
-  if (completion_record->ongoing_rpcs.size() == 0
-      && completion_record->state != RpcState::COMPLETED) {
-    cr_updater.Cancel();
-    unique_ptr<CompletionRecord> completion_record =
-        EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
-    mem_tracker_->Release(completion_record->memory_footprint());
+  // Wait until outside the lock to do the heavy-weight work.
+  for (auto& ongoing_rpc : to_handle) {
+    if (ongoing_rpc.context != nullptr) {
+      func(ongoing_rpc);
+      delete ongoing_rpc.context;
+    }
   }
 }