You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2021/04/09 14:50:07 UTC
[kudu] branch master updated: KUDU-3248: Match C++ replica
selection behavior of Java client
This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new ccbbfb3 KUDU-3248: Match C++ replica selection behavior of Java client
ccbbfb3 is described below
commit ccbbfb3006314f2c37f3a40bfec355db9fc90e02
Author: Grant Henke <gr...@apache.org>
AuthorDate: Fri Feb 26 09:32:17 2021 -0600
KUDU-3248: Match C++ replica selection behavior of Java client
The C++ client currently uses a new random value every time
a replica is selected. Alternatively, the Java client uses a static
value that ensures the selection for a single process or application
remains deterministic. This patch adjusts the C++ implementation
to match the Java client behavior.
This is expected to be a good balance of efficient use of cache
memory and distribution of load across the replicas given a
single process will always get the same choice resulting in
cache hits for follow up scans, but separate processes
will get a potentially different random selection.
The reviews on the Java patch here have some good context
and discussion: https://gerrit.cloudera.org/#/c/12158/
Change-Id: Iaa55e88b4a222fabfaa7fa521c24482cc6816b04
Reviewed-on: http://gerrit.cloudera.org:8080/17129
Tested-by: Kudu Jenkins
Reviewed-by: Attila Bukor <ab...@apache.org>
---
.../java/org/apache/kudu/client/RemoteTablet.java | 14 ++++--
src/kudu/client/client-internal.cc | 24 +++++++++--
src/kudu/client/client-test.cc | 50 ++++++++++++++++++++++
src/kudu/client/client.h | 1 +
src/kudu/common/common.proto | 7 ++-
src/kudu/integration-tests/disk_failure-itest.cc | 17 +++++---
.../tablet_server_quiescing-itest.cc | 11 +++--
7 files changed, 105 insertions(+), 19 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index 6d63b2c..7530937 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -57,7 +57,13 @@ import org.apache.kudu.consensus.Metadata;
public class RemoteTablet implements Comparable<RemoteTablet> {
private static final Logger LOG = LoggerFactory.getLogger(RemoteTablet.class);
- private static final int randomInt = new Random().nextInt(Integer.MAX_VALUE);
+
+ // This random integer is used when making any random choice for replica
+ // selection. It is static to provide a deterministic selection for any given
+ // process and therefore also better cache affinity while ensuring that we can
+ // still benefit from spreading the load across replicas for other processes
+ // and applications.
+ private static final int RANDOM_SELECTION_INT = new Random().nextInt(Integer.MAX_VALUE);
private final String tableId;
private final String tabletId;
@@ -206,7 +212,7 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
ServerInfo result = null;
List<ServerInfo> localServers = new ArrayList<>();
List<ServerInfo> serversInSameLocation = new ArrayList<>();
- int randomIndex = randomInt % tabletServers.size();
+ int randomIndex = RANDOM_SELECTION_INT % tabletServers.size();
int index = 0;
for (ServerInfo e : tabletServers.values()) {
boolean serverInSameLocation = !location.isEmpty() && e.inSameLocation(location);
@@ -228,11 +234,11 @@ public class RemoteTablet implements Comparable<RemoteTablet> {
index++;
}
if (!localServers.isEmpty()) {
- randomIndex = randomInt % localServers.size();
+ randomIndex = RANDOM_SELECTION_INT % localServers.size();
return localServers.get(randomIndex);
}
if (!serversInSameLocation.isEmpty()) {
- randomIndex = randomInt % serversInSameLocation.size();
+ randomIndex = RANDOM_SELECTION_INT % serversInSameLocation.size();
return serversInSameLocation.get(randomIndex);
}
return result;
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index 04069ac..30d76a9 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -19,13 +19,13 @@
#include <algorithm>
#include <cstdint>
-#include <cstdlib>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
+#include <random>
#include <string>
#include <utility>
#include <vector>
@@ -183,6 +183,22 @@ KuduClient::Data::~Data() {
dns_resolver_.reset();
}
+namespace {
+
+// This random integer is used when making any random choice for replica
+// selection. It is static to provide a deterministic selection for any given
+// process and therefore also better cache affinity while ensuring that we can
+// still benefit from spreading the load across replicas for other processes
+// and applications.
+int InitRandomSelectionInt() {
+ std::random_device rdev;
+ std::mt19937 gen(rdev());
+ return gen();
+}
+static const int kRandomSelectionInt = InitRandomSelectionInt();
+
+} // anonymous namespace
+
RemoteTabletServer* KuduClient::Data::SelectTServer(
const scoped_refptr<RemoteTablet>& rt,
const ReplicaSelection selection,
@@ -249,11 +265,11 @@ RemoteTabletServer* KuduClient::Data::SelectTServer(
}
}
if (!local.empty()) {
- ret = local[rand() % local.size()];
+ ret = local[kRandomSelectionInt % local.size()];
} else if (!same_location.empty()) {
- ret = same_location[rand() % same_location.size()];
+ ret = same_location[kRandomSelectionInt % same_location.size()];
} else if (!filtered.empty()) {
- ret = filtered[rand() % filtered.size()];
+ ret = filtered[kRandomSelectionInt % filtered.size()];
}
break;
}
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9f8392a..76bf022 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2562,6 +2562,56 @@ TEST_F(ClientTest, TestGetTabletServerBlacklist) {
}
}
+TEST_F(ClientTest, TestGetTabletServerDeterministic) {
+ shared_ptr<KuduTable> table;
+ NO_FATALS(CreateTable("selection",
+ 3,
+ GenerateSplitRows(),
+ {},
+ &table));
+ InsertTestRows(table.get(), 1, 0);
+
+ // Look up the tablet and its replicas into the metadata cache.
+ // We have to loop since some replicas may have been created slowly.
+ scoped_refptr<internal::RemoteTablet> rt;
+ while (true) {
+ rt = MetaCacheLookup(table.get(), "");
+ ASSERT_TRUE(rt.get() != nullptr);
+ vector<internal::RemoteTabletServer*> tservers;
+ rt->GetRemoteTabletServers(&tservers);
+ if (tservers.size() == 3) {
+ break;
+ }
+ // Marking stale forces a lookup.
+ rt->MarkStale();
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // Get the closest replica from the same client twice and ensure they match.
+ set<string> blacklist;
+ vector<internal::RemoteTabletServer*> candidates;
+ internal::RemoteTabletServer* rts1;
+ ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
+ KuduClient::CLOSEST_REPLICA,
+ blacklist, &candidates, &rts1));
+ internal::RemoteTabletServer *rts2;
+ ASSERT_OK(client_->data_->GetTabletServer(client_.get(), rt,
+ KuduClient::CLOSEST_REPLICA,
+ blacklist, &candidates, &rts2));
+ ASSERT_EQ(rts1->permanent_uuid(), rts2->permanent_uuid());
+
+ // Get the closest replica from a different client and ensure it matches.
+ shared_ptr<KuduClient> c2;
+ ASSERT_OK(KuduClientBuilder()
+ .add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
+ .Build(&c2));
+ internal::RemoteTabletServer *rts3;
+ ASSERT_OK(c2->data_->GetTabletServer(c2.get(), rt,
+ KuduClient::CLOSEST_REPLICA,
+ blacklist, &candidates, &rts3));
+ ASSERT_EQ(rts2->permanent_uuid(), rts3->permanent_uuid());
+}
+
TEST_F(ClientTest, TestScanWithEncodedRangePredicate) {
shared_ptr<KuduTable> table;
NO_FATALS(CreateTable("split-table",
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a85d218..10b55c6 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -935,6 +935,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
FRIEND_TEST(ClientTest, TestCacheAuthzTokens);
FRIEND_TEST(ClientTest, TestGetSecurityInfoFromMaster);
FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
+ FRIEND_TEST(ClientTest, TestGetTabletServerDeterministic);
FRIEND_TEST(ClientTest, TestMasterDown);
FRIEND_TEST(ClientTest, TestMasterLookupPermits);
FRIEND_TEST(ClientTest, TestMetaCacheExpiry);
diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto
index d3d51f7..0043669 100644
--- a/src/kudu/common/common.proto
+++ b/src/kudu/common/common.proto
@@ -302,8 +302,11 @@ enum ReplicaSelection {
UNKNOWN_REPLICA_SELECTION = 0;
// Select the LEADER replica.
LEADER_ONLY = 1;
- // Select the closest replica to the client, or a random one if all replicas
- // are equidistant.
+ // Select the closest replica to the client. Replicas are classified from
+ // closest to furthest as follows:
+ // - Local replicas
+ // - Replicas whose tablet server has the same location as the client
+ // - All other replicas
CLOSEST_REPLICA = 2;
}
diff --git a/src/kudu/integration-tests/disk_failure-itest.cc b/src/kudu/integration-tests/disk_failure-itest.cc
index ae15976..7303ebf 100644
--- a/src/kudu/integration-tests/disk_failure-itest.cc
+++ b/src/kudu/integration-tests/disk_failure-itest.cc
@@ -56,6 +56,7 @@ METRIC_DECLARE_gauge_uint64(data_dirs_failed);
METRIC_DECLARE_gauge_uint32(tablets_num_failed);
using kudu::client::sp::shared_ptr;
+using kudu::client::KuduClient;
using kudu::client::KuduDelete;
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
@@ -224,7 +225,7 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
// Also configure the cluster to not delete or copy tablets, even on error.
// This allows us to check all tablets are failed appropriately.
void SetUp() override {
- const int kNumRows = 5000;
+ const int kNumRows = 10000;
ExternalMiniClusterOptions opts;
// Use 3 tservers at first; we'll add an empty one later.
opts.num_tablet_servers = 3;
@@ -243,7 +244,8 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
NO_FATALS(StartClusterWithOpts(std::move(opts)));
// Write some rows to the three servers.
- TestWorkload writes(cluster_.get());
+ // Uses HASH partitioning to be sure we hit all tablets.
+ TestWorkload writes(cluster_.get(), TestWorkload::PartitioningType::HASH);
writes.set_num_tablets(kNumTablets);
writes.Setup();
writes.Start();
@@ -281,13 +283,14 @@ class TabletServerDiskErrorITest : public DiskErrorITestBase {
// Waits for the number of failed tablets on the tablet server to reach
// `num_failed`.
- void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed) const {
+ void WaitForFailedTablets(ExternalTabletServer* ts, int num_failed,
+ bool require_all_fail = true) const {
ASSERT_EVENTUALLY([&] {
int64_t failed_on_ts;
ASSERT_OK(itest::GetInt64Metric(ts->bound_http_hostport(),
&METRIC_ENTITY_server, nullptr, &METRIC_tablets_num_failed, "value", &failed_on_ts));
LOG(INFO) << "Currently has " << failed_on_ts << " failed tablets";
- ASSERT_EQ(num_failed, failed_on_ts);
+ ASSERT_TRUE(failed_on_ts == num_failed || (!require_all_fail && failed_on_ts > 0));
});
}
};
@@ -327,8 +330,10 @@ TEST_P(TabletServerDiskErrorITest, TestFailDuringScanWorkload) {
ExternalTabletServer* error_ts = cluster_->tablet_server(0);
ASSERT_OK(SetFlags(error_ts, InjectionFlags(GetParam(), error_ts)));
- // Wait for all the tablets to reach a failed state.
- NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets));
+ // Wait for some of the tablets to reach a failed state.
+ // We can't wait for all of the tablets in this case because
+ // some may not be scanned and will therefore not be marked as failed.
+ NO_FATALS(WaitForFailedTablets(error_ts, kNumTablets, /*require_all_fail*/false));
ASSERT_OK(AllowRecovery());
NO_FATALS(read.StopAndJoin());
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index d66bfcd..db5220c 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -246,13 +246,18 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScans) {
rw_workload->Setup();
rw_workload->Start();
- // Wait for some scans to begin.
- auto* ts = cluster_->mini_tablet_server(0);
+ // Wait for some scans to begin on a tablet server.
+ int ts_id = 0;
+ auto* ts = cluster_->mini_tablet_server(ts_id);
ASSERT_EVENTUALLY([&] {
+ // Pick the next tablet server on each try to handle the
+ // case where certain tablet servers will never have an active scanner.
+ ts_id = (ts_id + 1) % cluster_->num_tablet_servers();
+ ts = cluster_->mini_tablet_server(ts_id);
ASSERT_LT(0, ts->server()->scanner_manager()->CountActiveScanners());
});
- // Mark a tablet server as quiescing. It should eventually stop serving scans.
+ // Mark the tablet server as quiescing. It should eventually stop serving scans.
*ts->server()->mutable_quiescing() = true;
ASSERT_EVENTUALLY([&] {
ASSERT_EQ(0, ts->server()->scanner_manager()->CountActiveScanners());