You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2021/04/09 14:50:07 UTC

[kudu] branch master updated: KUDU-3248: Match C++ replica selection behavior of Java client

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ccbbfb3  KUDU-3248: Match C++ replica selection behavior of Java client
ccbbfb3 is described below

commit ccbbfb3006314f2c37f3a40bfec355db9fc90e02
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Feb 26 09:32:17 2021 -0600

    KUDU-3248: Match C++ replica selection behavior of Java client
    
    The C++ client currently uses a new random value every time
    a replica is selected. Alternatively, the Java client uses a static
    value that ensures the selection for a single process or application
    remains deterministic. This patch adjusts the C++ implementation
    to match the Java client behavior.
    
    This is expected to be a good balance of efficient use of cache
    memory and distribution of load across the replicas given a
    single process will always get the same choice resulting in
    cache hits for follow up scans, but separate processes
    will get a potentially different random selection.
    
    The reviews on the Java patch here have some good context
    and discussion: https://gerrit.cloudera.org/#/c/12158/
    
    Change-Id: Iaa55e88b4a222fabfaa7fa521c24482cc6816b04
    Reviewed-on: http://gerrit.cloudera.org:8080/17129
    Tested-by: Kudu Jenkins
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 .../java/org/apache/kudu/client/RemoteTablet.java  | 14 ++++--
 src/kudu/client/client-internal.cc                 | 24 +++++++++--
 src/kudu/client/client-test.cc                     | 50 ++++++++++++++++++++++
 src/kudu/client/client.h                           |  1 +
 src/kudu/common/common.proto                       |  7 ++-
 src/kudu/integration-tests/disk_failure-itest.cc   | 17 +++++---
 .../tablet_server_quiescing-itest.cc               | 11 +++--
 7 files changed, 105 insertions(+), 19 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 6d63b2c..7530937 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -57,7 +57,13 @@ import org.apache.kudu.consensus.Metadata;
 public class RemoteTablet implements Comparable<RemoteTablet> {
 
   private static final Logger LOG = LoggerFactory.getLogger(RemoteTablet.class);
-  private static final int randomInt = new Random().nextInt(Integer.MAX_VALUE);
+
+  // This random integer is used when making any random choice for replica
+  // selection. It is static to provide a deterministic selection for any given
+  // process and therefore also better cache affinity while ensuring that we can
+  // still benefit from spreading the load across replicas for other processes
+  // and applications.
+  private static final int RANDOM_SELECTION_INT = new Random().nextInt(Integer.MAX_VALUE);
 
   private final String tableId;
   private final String tabletId;
@@ -206,7 +212,7 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
       ServerInfo result = null;
       List<ServerInfo> localServers = new ArrayList<>();
       List<ServerInfo> serversInSameLocation = new ArrayList<>();
-      int randomIndex = randomInt % tabletServers.size();
+      int randomIndex = RANDOM_SELECTION_INT % tabletServers.size();
       int index = 0;
       for (ServerInfo e : tabletServers.values()) {
         boolean serverInSameLocation = !location.isEmpty() && e.inSameLocation(location);
@@ -228,11 +234,11 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
         index++;
       }
       if (!localServers.isEmpty()) {
-        randomIndex = randomInt % localServers.size();
+        randomIndex = RANDOM_SELECTION_INT % localServers.size();
         return localServers.get(randomIndex);
       }
       if (!serversInSameLocation.isEmpty()) {
-        randomIndex = randomInt % serversInSameLocation.size();
+        randomIndex = RANDOM_SELECTION_INT % serversInSameLocation.size();
         return serversInSameLocation.get(randomIndex);
       }
       return result;
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 04069ac..30d76a9 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -19,13 +19,13 @@
 
 #include <algorithm>
 #include <cstdint>
-#include <cstdlib>
 #include <functional>
 #include <limits>
 #include <map>
 #include <memory>
 #include <mutex>
 #include <ostream>
+#include <random>
 #include <string>
 #include <utility>
 #include <vector>
@@ -183,6 +183,22 @@ KuduClient::Data::~Data() {
   dns_resolver_.reset();
 }
 
+namespace {
+
+// This random integer is used when making any random choice for replica
+// selection. It is static to provide a deterministic selection for any given
+// process and therefore also better cache affinity while ensuring that we can
+// still benefit from spreading the load across replicas for other processes
+// and applications.
+int InitRandomSelectionInt() {
+  std::random_device rdev;
+  std::mt19937 gen(rdev());
+  return gen();
+}
+static const int kRandomSelectionInt = InitRandomSelectionInt();
+
+} // anonymous namespace
+
 RemoteTabletServer* KuduClient::Data::SelectTServer(
     const scoped_refptr<RemoteTablet>& rt,
     const ReplicaSelection selection,
@@ -249,11 +265,11 @@ RemoteTabletServer* KuduClient::Data::SelectTServer(
         }
       }
       if (!local.empty()) {
-        ret = local[rand() % local.size()];
+        ret = local[kRandomSelectionInt % local.size()];
       } else if (!same_location.empty()) {
-        ret = same_location[rand() % same_location.size()];
+        ret = same_location[kRandomSelectionInt % same_location.size()];
       } else if (!filtered.empty()) {
-        ret = filtered[rand() % filtered.size()];
+        ret = filtered[kRandomSelectionInt % filtered.size()];
       }
       break;
     }
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9f8392a..76bf022 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2562,6 +2562,56 @@ TEST_F(ClientTest, TestGetTabletServerBlacklist) {
   }
 }
 
+TEST_F(ClientTest, TestGetTabletServerDeterministic) {
+  shared_ptr<KuduTable> table;
+  NO_FATALS(CreateTable("selection",
+                        3,
+                        GenerateSplitRows(),
+                        {},
+                        &table));
+  InsertTestRows(table.get(), 1, 0);
+
+  // Look up the tablet and its replicas into the metadata cache.
+  // We have to loop since some replicas may have been created slowly.
+  scoped_refptr<internal::RemoteTablet> rt;
+  while (true) {
+    rt = MetaCacheLookup(table.get(), "");
+    ASSERT_TRUE(rt.get() != nullptr);
+    vector<internal::RemoteTabletServer*> tservers;
+    rt->GetRemoteTabletServers(&tservers);
+    if (tservers.size() == 3) {
+      break;
+    }
+    // Marking stale forces a lookup.
+    rt->MarkStale();
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Get the closest replica from the same client twice and ensure they match.
+  set<string> blacklist;
+  vector<internal::RemoteTabletServer*> candidates;
+  internal::RemoteTabletServer* rts1;
+  ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
+                                            KuduClient::CLOSEST_REPLICA,
+                                            blacklist, &candidates, &rts1));
+  internal::RemoteTabletServer *rts2;
+  ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
+                                            KuduClient::CLOSEST_REPLICA,
+                                            blacklist, &candidates, &rts2));
+  ASSERT_EQ(rts1->permanent_uuid(), rts2->permanent_uuid());
+
+  // Get the closest replica from a different client and ensure it matches.
+  shared_ptr<KuduClient> c2;
+  ASSERT_OK(KuduClientBuilder()
+      .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+      .Build(&c2));
+  internal::RemoteTabletServer *rts3;
+  ASSERT_OK(c2->data_->GetTabletServer(c2.get(), rt,
+                                       KuduClient::CLOSEST_REPLICA,
+                                       blacklist, &candidates, &rts3));
+  ASSERT_EQ(rts2->permanent_uuid(), rts3->permanent_uuid());
+}
+
 TEST_F(ClientTest, TestScanWithEncodedRangePredicate) {
   shared_ptr<KuduTable> table;
   NO_FATALS(CreateTable("split-table",
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a85d218..10b55c6 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -935,6 +935,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   FRIEND_TEST(ClientTest, TestCacheAuthzTokens);
   FRIEND_TEST(ClientTest, TestGetSecurityInfoFromMaster);
   FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
+  FRIEND_TEST(ClientTest, TestGetTabletServerDeterministic);
   FRIEND_TEST(ClientTest, TestMasterDown);
   FRIEND_TEST(ClientTest, TestMasterLookupPermits);
   FRIEND_TEST(ClientTest, TestMetaCacheExpiry);
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index d3d51f7..0043669 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -302,8 +302,11 @@ enum ReplicaSelection {
   UNKNOWN_REPLICA_SELECTION = 0;
   // Select the LEADER replica.
   LEADER_ONLY = 1;
-  // Select the closest replica to the client, or a random one if all replicas
-  // are equidistant.
+  // Select the closest replica to the client. Replicas are classified from
+  // closest to furthest as follows:
+  //   - Local replicas
+  //   - Replicas whose tablet server has the same location as the client
+  //   - All other replicas
   CLOSEST_REPLICA = 2;
 }
 
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index ae15976..7303ebf 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -56,6 +56,7 @@ METRIC_DECLARE_gauge_uint64(data_dirs_failed);
 METRIC_DECLARE_gauge_uint32(tablets_num_failed);
 
 using kudu::client::sp::shared_ptr;
+using kudu::client::KuduClient;
 using kudu::client::KuduDelete;
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
@@ -224,7 +225,7 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
   // Also configure the cluster to not delete or copy tablets, even on error.
   // This allows us to check all tablets are failed appropriately.
   void SetUp() override {
-    const int kNumRows = 5000;
+    const int kNumRows = 10000;
     ExternalMiniClusterOptions opts;
     // Use 3 tservers at first; we'll add an empty one later.
     opts.num_tablet_servers = 3;
@@ -243,7 +244,8 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
     NO_FATALS(StartClusterWithOpts(std::move(opts)));
 
     // Write some rows to the three servers.
-    TestWorkload writes(cluster_.get());
+    // Uses HASH partitioning to be sure we hit all tablets.
+    TestWorkload writes(cluster_.get(), TestWorkload::PartitioningType::HASH);
     writes.set_num_tablets(kNumTablets);
     writes.Setup();
     writes.Start();
@@ -281,13 +283,14 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
 
   // Waits for the number of failed tablets on the tablet server to reach
   // `num_failed`.
-  void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed) const {
+  void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed,
+                            bool require_all_fail = true) const {
     ASSERT_EVENTUALLY([&] {
       int64_t failed_on_ts;
       ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
           &METRIC_ENTITY_server, nullptr, &METRIC_tablets_num_failed, "value", &failed_on_ts));
       LOG(INFO) << "Currently has " << failed_on_ts << " failed tablets";
-      ASSERT_EQ(num_failed, failed_on_ts);
+      ASSERT_TRUE(failed_on_ts == num_failed || (!require_all_fail && failed_on_ts > 0));
     });
   }
 };
@@ -327,8 +330,10 @@ TEST_P(TabletServerDiskErrorITest, TestFailDuringScanWorkload) {
   ExternalTabletServer* error_ts = cluster_->tablet_server(0);
   ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
 
-  // Wait for all the tablets to reach a failed state.
-  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+  // Wait for some of the tablets to reach a failed state.
+  // We can't wait for all of the tablets in this case because
+  // some may not be scanned and will therefore not be marked as failed.
+  NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets, /*require_all_fail*/false));
   ASSERT_OK(AllowRecovery());
   NO_FATALS(read.StopAndJoin());
 
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index d66bfcd..db5220c 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -246,13 +246,18 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScans) {
   rw_workload->Setup();
   rw_workload->Start();
 
-  // Wait for some scans to begin.
-  auto* ts = cluster_->mini_tablet_server(0);
+  // Wait for some scans to begin on a tablet server.
+  int ts_id = 0;
+  auto* ts = cluster_->mini_tablet_server(ts_id);
   ASSERT_EVENTUALLY([&] {
+    // Pick the next tablet server on each try to handle the
+    // case where certain tablet servers will never have an active scanner.
+    ts_id = (ts_id + 1) % cluster_->num_tablet_servers();
+    ts = cluster_->mini_tablet_server(ts_id);
     ASSERT_LT(0, ts->server()->scanner_manager()->CountActiveScanners());
   });
 
-  // Mark a tablet server as quiescing. It should eventually stop serving scans.
+  // Mark the tablet server as quiescing. It should eventually stop serving scans.
   *ts->server()->mutable_quiescing() = true;
   ASSERT_EVENTUALLY([&] {
     ASSERT_EQ(0, ts->server()->scanner_manager()->CountActiveScanners());