You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by da...@apache.org on 2016/06/03 20:20:46 UTC
incubator-kudu git commit: KUDU-1308 [c++-client]: support tables
with non-covering range partitions
Repository: incubator-kudu
Updated Branches:
refs/heads/master 738e99384 -> c24478b83
KUDU-1308 [c++-client]: support tables with non-covering range partitions
This commit introduces range bounds to the C++ client's table creation options.
Specifying range bounds allows applications to create tables with non-covering
range partitions, as described in the non-covering range partitions design
document. Additionally, the client is updated to work with tables with
non-covering range partitions.
Change-Id: I1cb12704c5e9792ee6e5831568bc52b1a713f8d5
Reviewed-on: http://gerrit.cloudera.org:8080/3255
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c24478b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c24478b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c24478b8
Branch: refs/heads/master
Commit: c24478b8384f20d0477f7901c0962c55999345fb
Parents: 738e993
Author: Dan Burkert <da...@cloudera.com>
Authored: Mon May 23 14:42:17 2016 -0700
Committer: Dan Burkert <da...@cloudera.com>
Committed: Fri Jun 3 20:16:26 2016 +0000
----------------------------------------------------------------------
src/kudu/client/batcher.cc | 2 +-
src/kudu/client/client-internal.cc | 38 +-
src/kudu/client/client-internal.h | 6 +-
src/kudu/client/client-test.cc | 175 ++++++-
src/kudu/client/client.cc | 29 +-
src/kudu/client/client.h | 17 +
src/kudu/client/meta_cache.cc | 94 +++-
src/kudu/client/meta_cache.h | 15 +-
src/kudu/client/scan_token-internal.cc | 10 +-
src/kudu/client/scanner-internal.cc | 28 +-
src/kudu/client/scanner-internal.h | 9 +-
src/kudu/client/table_creator-internal.h | 5 +
src/kudu/common/partition_pruner-test.cc | 2 +-
src/kudu/common/partition_pruner.cc | 2 +-
src/kudu/common/partition_pruner.h | 4 +-
.../flex_partitioning-itest.cc | 495 +++++++++----------
16 files changed, 612 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index 81add5a..c167215 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -267,7 +267,7 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
CHECK(partition_schema.PartitionContainsRow(partition, row, &partition_contains_row).ok());
CHECK(partition_contains_row)
<< "Row " << partition_schema.RowDebugString(row)
- << "not in partition " << partition_schema.PartitionDebugString(partition, *schema);
+ << " not in partition " << partition_schema.PartitionDebugString(partition, *schema);
#endif
enc.Add(ToInternalWriteType(op->write_op->type()), op->write_op->row());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/client-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.cc b/src/kudu/client/client-internal.cc
index f7f2420..66e141c 100644
--- a/src/kudu/client/client-internal.cc
+++ b/src/kudu/client/client-internal.cc
@@ -66,8 +66,9 @@ using master::ListTablesRequestPB;
using master::ListTablesResponsePB;
using master::ListTabletServersRequestPB;
using master::ListTabletServersResponsePB;
-using master::MasterServiceProxy;
using master::MasterErrorPB;
+using master::MasterFeatures;
+using master::MasterServiceProxy;
using rpc::Rpc;
using rpc::RpcController;
using strings::Substitute;
@@ -136,7 +137,8 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
const boost::function<Status(MasterServiceProxy*,
const ReqClass&,
RespClass*,
- RpcController*)>& func) {
+ RpcController*)>& func,
+ vector<uint32_t> required_feature_flags) {
DCHECK(deadline.Initialized());
while (true) {
@@ -161,6 +163,11 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
if (num_attempts != nullptr) {
++*num_attempts;
}
+
+ for (uint32_t required_feature_flag : required_feature_flags) {
+ rpc.RequireServerFeature(required_feature_flag);
+ }
+
Status s = func(master_proxy_.get(), req, resp, &rpc);
if (s.IsNetworkError()) {
LOG(WARNING) << "Unable to send the request (" << req.ShortDebugString()
@@ -219,7 +226,8 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
const boost::function<Status(MasterServiceProxy*,
const ListTablesRequestPB&,
ListTablesResponsePB*,
- RpcController*)>& func);
+ RpcController*)>& func,
+ vector<uint32_t> required_feature_flags);
template
Status KuduClient::Data::SyncLeaderMasterRpc(
const MonoTime& deadline,
@@ -231,7 +239,8 @@ Status KuduClient::Data::SyncLeaderMasterRpc(
const boost::function<Status(MasterServiceProxy*,
const ListTabletServersRequestPB&,
ListTabletServersResponsePB*,
- RpcController*)>& func);
+ RpcController*)>& func,
+ vector<uint32_t> required_feature_flags);
KuduClient::Data::Data()
: latest_observed_timestamp_(KuduClient::kNoTimestamp) {
@@ -336,12 +345,18 @@ Status KuduClient::Data::GetTabletServer(KuduClient* client,
Status KuduClient::Data::CreateTable(KuduClient* client,
const CreateTableRequestPB& req,
const KuduSchema& schema,
- const MonoTime& deadline) {
+ const MonoTime& deadline,
+ bool has_range_partition_bounds) {
CreateTableResponsePB resp;
int attempts = 0;
+ vector<uint32_t> features;
+ if (has_range_partition_bounds) {
+ features.push_back(MasterFeatures::RANGE_PARTITION_BOUNDS);
+ }
Status s = SyncLeaderMasterRpc<CreateTableRequestPB, CreateTableResponsePB>(
- deadline, client, req, &resp, &attempts, "CreateTable", &MasterServiceProxy::CreateTable);
+ deadline, client, req, &resp, &attempts, "CreateTable", &MasterServiceProxy::CreateTable,
+ features);
RETURN_NOT_OK(s);
if (resp.has_error()) {
if (resp.error().code() == MasterErrorPB::TABLE_ALREADY_PRESENT && attempts > 1) {
@@ -403,7 +418,8 @@ Status KuduClient::Data::IsCreateTableInProgress(KuduClient* client,
&resp,
nullptr,
"IsCreateTableDone",
- &MasterServiceProxy::IsCreateTableDone);
+ &MasterServiceProxy::IsCreateTableDone,
+ {});
// RETURN_NOT_OK macro can't take templated function call as param,
// and SyncLeaderMasterRpc must be explicitly instantiated, else the
// compiler complains.
@@ -436,7 +452,7 @@ Status KuduClient::Data::DeleteTable(KuduClient* client,
req.mutable_table()->set_table_name(table_name);
Status s = SyncLeaderMasterRpc<DeleteTableRequestPB, DeleteTableResponsePB>(
deadline, client, req, &resp,
- &attempts, "DeleteTable", &MasterServiceProxy::DeleteTable);
+ &attempts, "DeleteTable", &MasterServiceProxy::DeleteTable, {});
RETURN_NOT_OK(s);
if (resp.has_error()) {
if (resp.error().code() == MasterErrorPB::TABLE_NOT_FOUND && attempts > 1) {
@@ -462,7 +478,8 @@ Status KuduClient::Data::AlterTable(KuduClient* client,
&resp,
nullptr,
"AlterTable",
- &MasterServiceProxy::AlterTable);
+ &MasterServiceProxy::AlterTable,
+ {});
RETURN_NOT_OK(s);
// TODO: Consider the situation where the request is sent to the
// server, gets executed on the server and written to the server,
@@ -491,7 +508,8 @@ Status KuduClient::Data::IsAlterTableInProgress(KuduClient* client,
&resp,
nullptr,
"IsAlterTableDone",
- &MasterServiceProxy::IsAlterTableDone);
+ &MasterServiceProxy::IsAlterTableDone,
+ {});
RETURN_NOT_OK(s);
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 3db9eb1..861b30f 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -71,7 +71,8 @@ class KuduClient::Data {
Status CreateTable(KuduClient* client,
const master::CreateTableRequestPB& req,
const KuduSchema& schema,
- const MonoTime& deadline);
+ const MonoTime& deadline,
+ bool has_range_partition_bounds);
Status IsCreateTableInProgress(KuduClient* client,
const std::string& table_name,
@@ -184,7 +185,8 @@ class KuduClient::Data {
const char* func_name,
const boost::function<Status(master::MasterServiceProxy*,
const ReqClass&, RespClass*,
- rpc::RpcController*)>& func);
+ rpc::RpcController*)>& func,
+ std::vector<uint32_t> required_feature_flags);
// The unique id of this client.
std::string client_id_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9f4e3d8..94f709b 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -22,6 +22,9 @@
#include <algorithm>
#include <map>
#include <memory>
+#include <set>
+#include <string>
+#include <utility>
#include <vector>
#include "kudu/client/callbacks.h"
@@ -80,6 +83,7 @@ METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetMasterRegi
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations);
+using std::pair;
using std::set;
using std::string;
using std::unique_ptr;
@@ -131,8 +135,8 @@ class ClientTest : public KuduTest {
.add_master_server_addr(cluster_->mini_master()->bound_rpc_addr().ToString())
.Build(&client_));
- ASSERT_NO_FATAL_FAILURE(CreateTable(kTableName, 1, GenerateSplitRows(), &client_table_));
- ASSERT_NO_FATAL_FAILURE(CreateTable(kTable2Name, 1, {}, &client_table2_));
+ ASSERT_NO_FATAL_FAILURE(CreateTable(kTableName, 1, GenerateSplitRows(), {}, &client_table_));
+ ASSERT_NO_FATAL_FAILURE(CreateTable(kTable2Name, 1, {}, {}, &client_table2_));
}
// Generate a set of split rows for tablets used in this test.
@@ -410,11 +414,13 @@ class ClientTest : public KuduTest {
return count;
}
- // Creates a table with 'num_replicas', split into tablets based on 'split_rows'
- // (or single tablet if 'split_rows' is empty).
+ // Creates a table with 'num_replicas', split into tablets based on
+ // 'split_rows' and 'range_bounds' (or single tablet if both are empty).
void CreateTable(const string& table_name,
int num_replicas,
vector<unique_ptr<KuduPartialRow>> split_rows,
+ vector<pair<unique_ptr<KuduPartialRow>,
+ unique_ptr<KuduPartialRow>>> range_bounds,
shared_ptr<KuduTable>* table) {
bool added_replicas = false;
@@ -432,7 +438,9 @@ class ClientTest : public KuduTest {
for (auto& split_row : split_rows) {
table_creator->add_range_split(split_row.release());
}
-
+ for (auto& bound : range_bounds) {
+ table_creator->add_range_bound(bound.first.release(), bound.second.release());
+ }
ASSERT_OK(table_creator->table_name(table_name)
.schema(&schema_)
.num_replicas(num_replicas)
@@ -654,7 +662,7 @@ TEST_F(ClientTest, TestScanMultiTablet) {
}
gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
shared_ptr<KuduTable> table;
- ASSERT_NO_FATAL_FAILURE(CreateTable("TestScanMultiTablet", 1, std::move(rows), &table));
+ ASSERT_NO_FATAL_FAILURE(CreateTable("TestScanMultiTablet", 1, std::move(rows), {}, &table));
// Insert rows with keys 12, 13, 15, 17, 22, 23, 25, 27...47 into each
// tablet, except the first which is empty.
@@ -917,7 +925,7 @@ TEST_F(ClientTest, TestInvalidPredicates) {
TEST_F(ClientTest, TestScanCloseProxy) {
const string kEmptyTable = "TestScanCloseProxy";
shared_ptr<KuduTable> table;
- ASSERT_NO_FATAL_FAILURE(CreateTable(kEmptyTable, 3, GenerateSplitRows(), &table));
+ ASSERT_NO_FATAL_FAILURE(CreateTable(kEmptyTable, 3, GenerateSplitRows(), {}, &table));
{
// Open and close an empty scanner.
@@ -1009,7 +1017,7 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
// to read from a replica that is lagging for some reason. This won't be necessary once
// we implement full support for snapshot consistency (KUDU-430).
const int kNumReplicas = 2;
- ASSERT_NO_FATAL_FAILURE(CreateTable(kScanTable, kNumReplicas, {}, &table));
+ ASSERT_NO_FATAL_FAILURE(CreateTable(kScanTable, kNumReplicas, {}, {}, &table));
ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), FLAGS_test_scan_num_rows));
// Do an initial scan to determine the expected rows for later verification.
@@ -1062,11 +1070,155 @@ TEST_F(ClientTest, TestScanFaultTolerance) {
}
}
+TEST_F(ClientTest, TestNonCoveringRangePartitions) {
+ // Create test table and insert test rows.
+ const string kTableName = "TestNonCoveringRangePartitions";
+ shared_ptr<KuduTable> table;
+
+ vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> bounds;
+ unique_ptr<KuduPartialRow> a_lower_bound(schema_.NewRow());
+ ASSERT_OK(a_lower_bound->SetInt32("key", 0));
+ unique_ptr<KuduPartialRow> a_upper_bound(schema_.NewRow());
+ ASSERT_OK(a_upper_bound->SetInt32("key", 100));
+ bounds.emplace_back(std::move(a_lower_bound), std::move(a_upper_bound));
+
+ unique_ptr<KuduPartialRow> b_lower_bound(schema_.NewRow());
+ ASSERT_OK(b_lower_bound->SetInt32("key", 200));
+ unique_ptr<KuduPartialRow> b_upper_bound(schema_.NewRow());
+ ASSERT_OK(b_upper_bound->SetInt32("key", 300));
+ bounds.emplace_back(std::move(b_lower_bound), std::move(b_upper_bound));
+
+ vector<unique_ptr<KuduPartialRow>> splits;
+ unique_ptr<KuduPartialRow> split(schema_.NewRow());
+ ASSERT_OK(split->SetInt32("key", 50));
+ splits.push_back(std::move(split));
+
+ CreateTable(kTableName, 1, std::move(splits), std::move(bounds), &table);
+
+ // Aggresively clear the meta cache between insert batches so that the meta
+ // cache will execute GetTableLocation RPCs at different partition keys.
+
+ ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), 50, 0));
+ client_->data_->meta_cache_->ClearCacheForTesting();
+ ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), 50, 50));
+ client_->data_->meta_cache_->ClearCacheForTesting();
+ ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), 100, 200));
+ client_->data_->meta_cache_->ClearCacheForTesting();
+
+ // Insert out-of-range rows.
+ shared_ptr<KuduSession> session = client_->NewSession();
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+ session->SetTimeoutMillis(60000);
+ vector<gscoped_ptr<KuduInsert>> out_of_range_inserts;
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -50));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), -1));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 100));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 150));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 199));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 300));
+ out_of_range_inserts.emplace_back(BuildTestRow(table.get(), 350));
+
+ for (auto& insert : out_of_range_inserts) {
+ client_->data_->meta_cache_->ClearCacheForTesting();
+ Status result = session->Apply(insert.release());
+ EXPECT_TRUE(result.IsIOError());
+ vector<KuduError*> errors;
+ bool overflowed;
+ session->GetPendingErrors(&errors, &overflowed);
+ EXPECT_FALSE(overflowed);
+ EXPECT_EQ(1, errors.size());
+ EXPECT_TRUE(errors[0]->status().IsNotFound());
+ STLDeleteElements(&errors);
+ }
+
+
+ // Scans
+
+ { // full table scan
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(200, rows.size());
+ ASSERT_EQ("(int32 key=0, int32 int_val=0, string string_val=hello 0,"
+ " int32 non_null_with_default=0)", rows.front());
+ ASSERT_EQ("(int32 key=299, int32 int_val=598, string string_val=hello 299,"
+ " int32 non_null_with_default=897)", rows.back());
+ }
+
+ { // Lower bound PK
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(100))));
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(100, rows.size());
+ ASSERT_EQ("(int32 key=200, int32 int_val=400, string string_val=hello 200,"
+ " int32 non_null_with_default=600)", rows.front());
+ ASSERT_EQ("(int32 key=299, int32 int_val=598, string string_val=hello 299,"
+ " int32 non_null_with_default=897)", rows.back());
+ }
+
+ { // Upper bound PK
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(199))));
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(100, rows.size());
+ ASSERT_EQ("(int32 key=0, int32 int_val=0, string string_val=hello 0,"
+ " int32 non_null_with_default=0)", rows.front());
+ ASSERT_EQ("(int32 key=99, int32 int_val=198, string string_val=hello 99,"
+ " int32 non_null_with_default=297)", rows.back());
+ }
+
+ { // key <= -1
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(-1))));
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(0, rows.size());
+ }
+
+ { // key >= 120 && key <= 180
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(120))));
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(180))));
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(0, rows.size());
+ }
+
+ { // key >= 300
+ vector<string> rows;
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetFaultTolerant());
+ ASSERT_OK(scanner.AddConjunctPredicate(table->NewComparisonPredicate(
+ "key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(300))));
+ ScanToStrings(&scanner, &rows);
+
+ ASSERT_EQ(0, rows.size());
+ }
+}
+
TEST_F(ClientTest, TestGetTabletServerBlacklist) {
shared_ptr<KuduTable> table;
ASSERT_NO_FATAL_FAILURE(CreateTable("blacklist",
3,
GenerateSplitRows(),
+ {},
&table));
InsertTestRows(table.get(), 1, 0);
@@ -1146,6 +1298,7 @@ TEST_F(ClientTest, TestScanWithEncodedRangePredicate) {
ASSERT_NO_FATAL_FAILURE(CreateTable("split-table",
1, /* replicas */
GenerateSplitRows(),
+ {},
&table));
ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), 100));
@@ -2167,7 +2320,7 @@ TEST_F(ClientTest, TestDeleteTable) {
// Create a new table with the same name. This is to ensure that the client
// doesn't cache anything inappropriately by table name (see KUDU-1055).
- NO_FATALS(CreateTable(kTableName, 1, GenerateSplitRows(), &client_table_));
+ NO_FATALS(CreateTable(kTableName, 1, GenerateSplitRows(), {}, &client_table_));
// Should be able to insert successfully into the new table.
NO_FATALS(InsertTestRows(client_.get(), client_table_.get(), 10));
@@ -2240,6 +2393,7 @@ TEST_F(ClientTest, TestReplicatedMultiTabletTable) {
ASSERT_NO_FATAL_FAILURE(CreateTable(kReplicatedTable,
kNumReplicas,
GenerateSplitRows(),
+ {},
&table));
// Should have no rows to begin with.
@@ -2265,6 +2419,7 @@ TEST_F(ClientTest, TestReplicatedMultiTabletTableFailover) {
ASSERT_NO_FATAL_FAILURE(CreateTable(kReplicatedTable,
kNumReplicas,
GenerateSplitRows(),
+ {},
&table));
// Insert some data.
@@ -2324,7 +2479,7 @@ TEST_F(ClientTest, TestReplicatedTabletWritesWithLeaderElection) {
const int kNumReplicas = 3;
shared_ptr<KuduTable> table;
- ASSERT_NO_FATAL_FAILURE(CreateTable(kReplicatedTable, kNumReplicas, {}, &table));
+ ASSERT_NO_FATAL_FAILURE(CreateTable(kReplicatedTable, kNumReplicas, {}, {}, &table));
// Insert some data.
ASSERT_NO_FATAL_FAILURE(InsertTestRows(table.get(), kNumRowsToWrite));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index e605b9b..c884351 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -19,10 +19,12 @@
#include <algorithm>
#include <boost/bind.hpp>
+#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <unordered_map>
+#include <utility>
#include <vector>
#include "kudu/client/batcher.h"
@@ -79,8 +81,10 @@ using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RpcController;
using kudu::tserver::ScanResponsePB;
+using std::pair;
using std::set;
using std::string;
+using std::unique_ptr;
using std::vector;
MAKE_ENUM_LIMITS(kudu::client::KuduSession::FlushMode,
@@ -307,7 +311,8 @@ Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers)
&resp,
nullptr,
"ListTabletServers",
- &MasterServiceProxy::ListTabletServers);
+ &MasterServiceProxy::ListTabletServers,
+ {});
RETURN_NOT_OK(s);
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
@@ -340,7 +345,8 @@ Status KuduClient::ListTables(vector<string>* tables,
&resp,
nullptr,
"ListTables",
- &MasterServiceProxy::ListTables);
+ &MasterServiceProxy::ListTables,
+ {});
RETURN_NOT_OK(s);
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
@@ -479,6 +485,13 @@ KuduTableCreator& KuduTableCreator::split_rows(const vector<const KuduPartialRow
return *this;
}
+KuduTableCreator& KuduTableCreator::add_range_bound(KuduPartialRow* lower_bound,
+ KuduPartialRow* upper_bound) {
+ data_->range_bounds_.emplace_back(unique_ptr<KuduPartialRow>(lower_bound),
+ unique_ptr<KuduPartialRow>(upper_bound));
+ return *this;
+}
+
KuduTableCreator& KuduTableCreator::num_replicas(int num_replicas) {
data_->num_replicas_ = num_replicas;
return *this;
@@ -525,6 +538,15 @@ Status KuduTableCreator::Create() {
}
encoder.Add(RowOperationsPB::SPLIT_ROW, *row);
}
+
+ for (const auto& bound : data_->range_bounds_) {
+ if (!bound.first || !bound.second) {
+ return Status::InvalidArgument("range bounds must not be null");
+ }
+ encoder.Add(RowOperationsPB::RANGE_LOWER_BOUND, *bound.first);
+ encoder.Add(RowOperationsPB::RANGE_UPPER_BOUND, *bound.second);
+ }
+
req.mutable_partition_schema()->CopyFrom(data_->partition_schema_);
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
@@ -537,7 +559,8 @@ Status KuduTableCreator::Create() {
RETURN_NOT_OK_PREPEND(data_->client_->data_->CreateTable(data_->client_,
req,
*data_->schema_,
- deadline),
+ deadline,
+ !data_->range_bounds_.empty()),
strings::Substitute("Error creating table $0 on the master",
data_->table_name_));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 93edad7..c68cc96 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -292,6 +292,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist);
FRIEND_TEST(ClientTest, TestMasterDown);
FRIEND_TEST(ClientTest, TestMasterLookupPermits);
+ FRIEND_TEST(ClientTest, TestNonCoveringRangePartitions);
FRIEND_TEST(ClientTest, TestReplicatedMultiTabletTableFailover);
FRIEND_TEST(ClientTest, TestReplicatedTabletWritesWithLeaderElection);
FRIEND_TEST(ClientTest, TestScanFaultTolerance);
@@ -368,6 +369,22 @@ class KUDU_EXPORT KuduTableCreator {
// DEPRECATED: use add_range_split
KuduTableCreator& split_rows(const std::vector<const KuduPartialRow*>& split_rows);
+ // Add a partition range bound to the table with an inclusive lower bound and
+ // exclusive upper bound.
+ //
+ // The table creator takes ownership of the rows. If either row is empty, then
+ // that end of the range will be unbounded. If a range column is missing a
+ // value, the logical minimum value for that column type will be used as the
+ // default.
+ //
+ // Multiple range bounds may be added, but they must not overlap. All split
+ // rows must fall in one of the range bounds. The lower bound must be less
+ // than the upper bound.
+ //
+ // If not provided, the table's range will be unbounded.
+ KuduTableCreator& add_range_bound(KuduPartialRow* lower_bound,
+ KuduPartialRow* upper_bound);
+
// Sets the number of replicas for each tablet in the table.
// This should be an odd number. Optional.
//
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/meta_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index fa4142b..f5e9832 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -473,7 +473,8 @@ class LookupRpc : public Rpc {
string partition_key,
scoped_refptr<RemoteTablet>* remote_tablet,
const MonoTime& deadline,
- const shared_ptr<Messenger>& messenger);
+ const shared_ptr<Messenger>& messenger,
+ bool is_exact_lookup);
virtual ~LookupRpc();
virtual void SendRpc() OVERRIDE;
virtual string ToString() const OVERRIDE;
@@ -481,6 +482,8 @@ class LookupRpc : public Rpc {
const GetTableLocationsResponsePB& resp() const { return resp_; }
const string& table_name() const { return table_->name(); }
const string& table_id() const { return table_->id(); }
+ const string& partition_key() const { return partition_key_; }
+ bool is_exact_lookup() const { return is_exact_lookup_; }
private:
virtual void SendRpcCb(const Status& status) OVERRIDE;
@@ -523,6 +526,11 @@ class LookupRpc : public Rpc {
// Whether this lookup has acquired a master lookup permit.
bool has_permit_;
+
+ // If true, this lookup is for an exact tablet match with the requested
+ // partition key. If false, the next tablet after the partition key should be
+ // returned if the partition key falls in a non-covered partition range.
+ bool is_exact_lookup_;
};
LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
@@ -530,14 +538,16 @@ LookupRpc::LookupRpc(const scoped_refptr<MetaCache>& meta_cache,
string partition_key,
scoped_refptr<RemoteTablet>* remote_tablet,
const MonoTime& deadline,
- const shared_ptr<Messenger>& messenger)
+ const shared_ptr<Messenger>& messenger,
+ bool is_exact_lookup)
: Rpc(deadline, messenger),
meta_cache_(meta_cache),
user_cb_(std::move(user_cb)),
table_(table),
partition_key_(std::move(partition_key)),
remote_tablet_(remote_tablet),
- has_permit_(false) {
+ has_permit_(false),
+ is_exact_lookup_(is_exact_lookup) {
DCHECK(deadline.Initialized());
}
@@ -685,15 +695,9 @@ void LookupRpc::SendRpcCb(const Status& status) {
}
}
- // Finally, ensure that there were tablet replicas found. If not, consider
- // that an error.
- if (new_status.ok() && resp_.tablet_locations_size() == 0) {
- new_status = Status::NotFound("No such tablet found");
- }
-
if (new_status.ok()) {
- const scoped_refptr<RemoteTablet>& result =
- meta_cache_->ProcessLookupResponse(*this);
+ scoped_refptr<RemoteTablet> result;
+ new_status = meta_cache_->ProcessLookupResponse(*this, &result);
if (remote_tablet_) {
*remote_tablet_ = result;
}
@@ -704,7 +708,8 @@ void LookupRpc::SendRpcCb(const Status& status) {
user_cb_.Run(new_status);
}
-const scoped_refptr<RemoteTablet>& MetaCache::ProcessLookupResponse(const LookupRpc& rpc) {
+Status MetaCache::ProcessLookupResponse(const LookupRpc& rpc,
+ scoped_refptr<RemoteTablet>* remote_tablet) {
VLOG(2) << "Processing master response for " << rpc.ToString()
<< ". Response: " << rpc.resp().ShortDebugString();
@@ -725,14 +730,13 @@ const scoped_refptr<RemoteTablet>& MetaCache::ProcessLookupResponse(const Lookup
DCHECK_EQ(loc.partition().partition_key_start(), remote->partition().partition_key_start());
DCHECK_EQ(loc.partition().partition_key_end(), remote->partition().partition_key_end());
- VLOG(3) << "Refreshing tablet " << tablet_id << ": "
- << loc.ShortDebugString();
+ VLOG(3) << "Refreshing tablet " << tablet_id << ": " << loc.ShortDebugString();
remote->Refresh(ts_cache_, loc.replicas());
continue;
}
- VLOG(3) << "Caching tablet " << tablet_id << " for (" << rpc.table_name()
- << "): " << loc.ShortDebugString();
+ VLOG(3) << "Caching tablet " << tablet_id << " for (" << rpc.table_name() << "): "
+ << loc.ShortDebugString();
Partition partition;
Partition::FromPB(loc.partition(), &partition);
@@ -741,10 +745,38 @@ const scoped_refptr<RemoteTablet>& MetaCache::ProcessLookupResponse(const Lookup
InsertOrDie(&tablets_by_id_, tablet_id, remote);
InsertOrDie(&tablets_by_key, partition.partition_key_start(), remote);
+
+ // TODO(KUDU-1421): Once removing partition ranges is supported, we should
+ // inspect the tablet locations for any non-covered ranges. Cached tablets
+ // falling in non-covered ranges should be removed.
}
- // Always return the first tablet.
- return FindOrDie(tablets_by_id_, rpc.resp().tablet_locations(0).tablet_id());
+ bool not_found = false;
+ // itr points to the first tablet that is greater than the requested partition key.
+ auto itr = tablets_by_key.upper_bound(rpc.partition_key());
+ if (tablets_by_key.empty()) {
+ not_found = true;
+ } else if (itr == tablets_by_key.begin()) {
+ // The requested partition key is before all tablets.
+ if (rpc.is_exact_lookup()) {
+ not_found = true;
+ }
+ *remote_tablet = itr->second;
+ } else if (std::prev(itr)->second->partition().partition_key_end() > rpc.partition_key() ||
+ std::prev(itr)->second->partition().partition_key_end().empty()) {
+ // Exact match.
+ *remote_tablet = std::prev(itr)->second;
+ } else if (itr == tablets_by_key.end() || rpc.is_exact_lookup()) {
+ // The requested partition key is beyond all tablets,
+ // or falls between two tablets and the lookup is exact.
+ not_found = true;
+ } else {
+ // The primary key falls between two tablets; return the second.
+ *remote_tablet = itr->second;
+ }
+
+ return not_found ? Status::NotFound("No tablet covering the requested range partition")
+ : Status::OK();
}
bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
@@ -778,6 +810,13 @@ bool MetaCache::LookupTabletByKeyFastPath(const KuduTable* table,
return false;
}
+void MetaCache::ClearCacheForTesting() {
+ shared_lock<rw_spinlock> l(&lock_);
+ STLDeleteValues(&ts_cache_);
+ tablets_by_id_.clear();
+ tablets_by_table_and_key_.clear();
+}
+
void MetaCache::LookupTabletByKey(const KuduTable* table,
const string& partition_key,
const MonoTime& deadline,
@@ -789,7 +828,24 @@ void MetaCache::LookupTabletByKey(const KuduTable* table,
partition_key,
remote_tablet,
deadline,
- client_->data_->messenger_);
+ client_->data_->messenger_,
+ true);
+ rpc->SendRpc();
+}
+
+void MetaCache::LookupTabletByKeyOrNext(const KuduTable* table,
+ const string& partition_key,
+ const MonoTime& deadline,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const StatusCallback& callback) {
+ LookupRpc* rpc = new LookupRpc(this,
+ callback,
+ table,
+ partition_key,
+ remote_tablet,
+ deadline,
+ client_->data_->messenger_,
+ false);
rpc->SendRpc();
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/meta_cache.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index c54f066..53403d1 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -57,6 +57,7 @@ class TSInfoPB;
namespace client {
class ClientTest_TestMasterLookupPermits_Test;
+class ClientTest_TestNonCoveringRangePartitions_Test;
class KuduClient;
class KuduTable;
@@ -272,6 +273,14 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);
+ // Look up which tablet hosts the given partition key, or the next tablet if
+ // the key falls in a non-covered range partition.
+ void LookupTabletByKeyOrNext(const KuduTable* table,
+ const std::string& partition_key,
+ const MonoTime& deadline,
+ scoped_refptr<RemoteTablet>* remote_tablet,
+ const StatusCallback& callback);
+
// Mark any replicas of any tablets hosted by 'ts' as failed. They will
// not be returned in future cache lookups.
void MarkTSFailed(RemoteTabletServer* ts, const Status& status);
@@ -287,10 +296,11 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
friend class LookupRpc;
FRIEND_TEST(client::ClientTest, TestMasterLookupPermits);
+ FRIEND_TEST(client::ClientTest, TestNonCoveringRangePartitions);
// Called on the slow LookupTablet path when the master responds. Populates
// the tablet caches and returns a reference to the first one.
- const scoped_refptr<RemoteTablet>& ProcessLookupResponse(const LookupRpc& rpc);
+ Status ProcessLookupResponse(const LookupRpc& rpc, scoped_refptr<RemoteTablet>* remote_tablet);
// Lookup the given tablet by key, only consulting local information.
// Returns true and sets *remote_tablet if successful.
@@ -298,6 +308,9 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
const std::string& partition_key,
scoped_refptr<RemoteTablet>* remote_tablet);
+ // Clears the meta cache for testing purposes.
+ void ClearCacheForTesting();
+
// Update our information about the given tablet server.
//
// This is called when we get some response from the master which contains
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/scan_token-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 4d559a0..8697fb1 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -218,11 +218,11 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
while (pruner.HasMorePartitionKeyRanges()) {
scoped_refptr<internal::RemoteTablet> tablet;
Synchronizer sync;
- client->data_->meta_cache_->LookupTabletByKey(table,
- pruner.NextPartitionKey(),
- deadline,
- &tablet,
- sync.AsStatusCallback());
+ client->data_->meta_cache_->LookupTabletByKeyOrNext(table,
+ pruner.NextPartitionKey(),
+ deadline,
+ &tablet,
+ sync.AsStatusCallback());
RETURN_NOT_OK(sync.Wait());
CHECK(tablet);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index c52e1c5..858799a 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -317,12 +317,28 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
for (int attempt = 1;; attempt++) {
Synchronizer sync;
- table_->client()->data_->meta_cache_->LookupTabletByKey(table_.get(),
- partition_key,
- deadline,
- &remote_,
- sync.AsStatusCallback());
- RETURN_NOT_OK(sync.Wait());
+ table_->client()->data_->meta_cache_->LookupTabletByKeyOrNext(table_.get(),
+ partition_key,
+ deadline,
+ &remote_,
+ sync.AsStatusCallback());
+ Status s = sync.Wait();
+ if (s.IsNotFound()) {
+ // No more tablets in the table.
+ partition_pruner_.RemovePartitionKeyRange("");
+ return Status::OK();
+ } else {
+ RETURN_NOT_OK(s);
+ }
+
+ // Check if the meta cache returned a tablet covering a partition key range past
+ // what we asked for. This can happen if the requested partition key falls
+ // in a non-covered range. In this case we can potentially prune the tablet.
+ if (partition_key < remote_->partition().partition_key_start() &&
+ partition_pruner_.ShouldPrune(remote_->partition())) {
+ partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
+ return Status::OK();
+ }
scan->set_tablet_id(remote_->tablet_id());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/scanner-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index e3a8ef5..550711a 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -104,7 +104,9 @@ class KuduScanner::Data {
const MonoTime& deadline,
std::set<std::string>* blacklist);
- // Open the next tablet in the scan.
+ // Opens the next tablet in the scan, or returns Status::NotFound if there are
+ // no more tablets to scan.
+ //
// The deadline is the time budget for this operation.
// The blacklist is used to temporarily filter out nodes that are experiencing transient errors.
// This blacklist may be modified by the callee.
@@ -121,7 +123,10 @@ class KuduScanner::Data {
Status KeepAlive();
- // Returns whether there exist more tablets we should scan.
+ // Returns whether there may exist more tablets to scan.
+ //
+ // This method does not take into account any non-covered range partitions
+ // that may exist in the table, so it should only be used as a hint.
//
// Note: there may not be any actual matching rows in subsequent tablets,
// but we won't know until we scan them.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/client/table_creator-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/table_creator-internal.h b/src/kudu/client/table_creator-internal.h
index 3f8d7e0..fc519ec 100644
--- a/src/kudu/client/table_creator-internal.h
+++ b/src/kudu/client/table_creator-internal.h
@@ -17,7 +17,9 @@
#ifndef KUDU_CLIENT_TABLE_CREATOR_INTERNAL_H
#define KUDU_CLIENT_TABLE_CREATOR_INTERNAL_H
+#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "kudu/client/client.h"
@@ -40,6 +42,9 @@ class KuduTableCreator::Data {
std::vector<std::unique_ptr<KuduPartialRow>> range_splits_;
+ std::vector<std::pair<std::unique_ptr<KuduPartialRow>,
+ std::unique_ptr<KuduPartialRow>>> range_bounds_;
+
PartitionSchemaPB partition_schema_;
int num_replicas_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/common/partition_pruner-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner-test.cc b/src/kudu/common/partition_pruner-test.cc
index 6ed5aeb..72aecf9 100644
--- a/src/kudu/common/partition_pruner-test.cc
+++ b/src/kudu/common/partition_pruner-test.cc
@@ -60,7 +60,7 @@ void CheckPrunedPartitions(const Schema& schema,
int pruned_partitions = count_if(partitions.begin(), partitions.end(),
[&] (const Partition& partition) {
- return pruner.ShouldPruneForTests(partition);
+ return pruner.ShouldPrune(partition);
});
ASSERT_EQ(remaining_tablets, partitions.size() - pruned_partitions);
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/common/partition_pruner.cc
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.cc b/src/kudu/common/partition_pruner.cc
index 88278e2..0836637 100644
--- a/src/kudu/common/partition_pruner.cc
+++ b/src/kudu/common/partition_pruner.cc
@@ -393,7 +393,7 @@ void PartitionPruner::RemovePartitionKeyRange(const string& upper_bound) {
}
}
-bool PartitionPruner::ShouldPruneForTests(const Partition& partition) const {
+bool PartitionPruner::ShouldPrune(const Partition& partition) const {
// range is an iterator that points to the first partition key range which
// overlaps or is greater than the partition.
auto range = lower_bound(partition_key_ranges_.rbegin(), partition_key_ranges_.rend(), partition,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/common/partition_pruner.h
----------------------------------------------------------------------
diff --git a/src/kudu/common/partition_pruner.h b/src/kudu/common/partition_pruner.h
index dc57098..5b2fb26 100644
--- a/src/kudu/common/partition_pruner.h
+++ b/src/kudu/common/partition_pruner.h
@@ -58,9 +58,7 @@ class PartitionPruner {
void RemovePartitionKeyRange(const std::string& upper_bound);
// Returns true if the provided partition should be pruned.
- //
- // Used for testing.
- bool ShouldPruneForTests(const Partition& partition) const;
+ bool ShouldPrune(const Partition& partition) const;
std::string ToString(const Schema& schema, const PartitionSchema& partition_schema) const;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c24478b8/src/kudu/integration-tests/flex_partitioning-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc
index 5c2fea9..32282ee 100644
--- a/src/kudu/integration-tests/flex_partitioning-itest.cc
+++ b/src/kudu/integration-tests/flex_partitioning-itest.cc
@@ -37,35 +37,116 @@
#include "kudu/util/test_util.h"
#include "kudu/gutil/strings/escaping.h"
-namespace kudu {
-namespace itest {
-
-using client::KuduClient;
-using client::KuduClientBuilder;
-using client::KuduColumnSchema;
-using client::KuduInsert;
-using client::KuduPredicate;
-using client::KuduScanner;
-using client::KuduSchema;
-using client::KuduSchemaBuilder;
-using client::KuduSession;
-using client::KuduTable;
-using client::KuduTableCreator;
-using client::KuduValue;
-using client::sp::shared_ptr;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduInsert;
+using kudu::client::KuduPredicate;
+using kudu::client::KuduScanner;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::KuduValue;
+using kudu::client::sp::shared_ptr;
+using kudu::master::GetTableLocationsRequestPB;
+using kudu::master::GetTableLocationsResponsePB;
+using kudu::master::MasterErrorPB;
+using kudu::rpc::RpcController;
+using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;
+namespace kudu {
+namespace itest {
+
static const char* const kTableName = "test-table";
-static const int kNumRows = 1000;
+
+struct HashPartitionOptions {
+ vector<string> columns;
+ int32_t num_buckets;
+};
+
+struct RangePartitionOptions {
+ vector<string> columns;
+ vector<vector<int32_t>> splits;
+ vector<pair<vector<int32_t>, vector<int32_t>>> bounds;
+};
+
+int NumPartitions(const vector<HashPartitionOptions>& hash_partitions,
+ const RangePartitionOptions& range_partition) {
+ int partitions = std::max(1UL, range_partition.bounds.size()) + range_partition.splits.size();
+ for (const auto& hash_partition : hash_partitions) {
+ partitions *= hash_partition.num_buckets;
+ }
+ return partitions;
+}
+
+string RowToString(const vector<int32_t> row) {
+ string s = "(";
+ for (int i = 0; i < row.size(); i++) {
+ if (i != 0) s.append(", ");
+ s.append(std::to_string(row[i]));
+ }
+ s.append(")");
+ return s;
+}
+
+string PartitionOptionsToString(const vector<HashPartitionOptions>& hash_partitions,
+ const RangePartitionOptions& range_partition) {
+ string s;
+ for (const auto& hash_partition : hash_partitions) {
+ s.append("HASH (");
+ for (int i = 0; i < hash_partition.columns.size(); i++) {
+ if (i != 0) s.append(", ");
+ s.append(hash_partition.columns[i]);
+ }
+ s.append(") INTO ");
+ s.append(std::to_string(hash_partition.num_buckets));
+ s.append(" BUCKETS, ");
+ }
+
+ s.append("RANGE (");
+ for (int i = 0; i < range_partition.columns.size(); i++) {
+ if (i != 0) s.append(", ");
+ s.append(range_partition.columns[i]);
+ }
+ s.append(")");
+
+ if (!range_partition.splits.empty()) {
+ s.append(" SPLIT ROWS ");
+
+ for (int i = 0; i < range_partition.splits.size(); i++) {
+ if (i != 0) s.append(", ");
+ s.append(RowToString(range_partition.splits[i]));
+ }
+ }
+
+ if (!range_partition.bounds.empty()) {
+ s.append(" BOUNDS (");
+
+ for (int i = 0; i < range_partition.bounds.size(); i++) {
+ if (i != 0) s.append(", ");
+ s.append("[");
+ s.append(RowToString(range_partition.bounds[i].first));
+ s.append(", ");
+ s.append(RowToString(range_partition.bounds[i].second));
+ s.append(")");
+ }
+ s.append(")");
+ }
+ return s;
+}
class FlexPartitioningITest : public KuduTest {
public:
FlexPartitioningITest()
: random_(GetRandomSeed32()) {
}
- virtual void SetUp() OVERRIDE {
+
+ void SetUp() override {
KuduTest::SetUp();
ExternalMiniClusterOptions opts;
@@ -79,35 +160,29 @@ class FlexPartitioningITest : public KuduTest {
KuduClientBuilder builder;
ASSERT_OK(cluster_->CreateClient(builder, &client_));
-
- ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
- cluster_->messenger(),
- &ts_map_));
}
- virtual void TearDown() OVERRIDE {
+ void TearDown() override {
cluster_->Shutdown();
KuduTest::TearDown();
- STLDeleteValues(&ts_map_);
- STLDeleteElements(&inserted_rows_);
}
protected:
- void CreateTable(int num_columns,
- const vector<string>& bucket_a, int num_buckets_a,
- const vector<string>& bucket_b, int num_buckets_b,
- const vector<string>& range_cols,
- int num_splits) {
- // Set up the actual PK columns based on num_columns. The PK is made up
- // of all the columns.
+
+ void TestPartitionOptions(const vector<HashPartitionOptions> hash_options,
+ const RangePartitionOptions range_options) {
+ NO_FATALS(CreateTable(hash_options, range_options));
+ NO_FATALS(InsertAndVerifyScans(range_options));
+ DeleteTable();
+ }
+
+ void CreateTable(const vector<HashPartitionOptions> hash_partitions,
+ const RangePartitionOptions range_partition) {
KuduSchemaBuilder b;
- vector<string> pk;
- for (int i = 0; i < num_columns; i++) {
- string name = Substitute("c$0", i);
- b.AddColumn(name)->Type(KuduColumnSchema::INT32)->NotNull();
- pk.push_back(name);
- }
- b.SetPrimaryKey(pk);
+ b.AddColumn("c0")->Type(KuduColumnSchema::INT32)->NotNull();
+ b.AddColumn("c1")->Type(KuduColumnSchema::INT32)->NotNull();
+ b.AddColumn("c2")->Type(KuduColumnSchema::INT32)->NotNull();
+ b.SetPrimaryKey({ "c0", "c1", "c2" });
KuduSchema schema;
ASSERT_OK(b.Build(&schema));
@@ -116,47 +191,74 @@ class FlexPartitioningITest : public KuduTest {
.schema(&schema)
.num_replicas(1);
- // Set up partitioning.
- if (!bucket_a.empty()) {
- table_creator->add_hash_partitions(bucket_a, num_buckets_a);
- }
- if (!bucket_b.empty()) {
- table_creator->add_hash_partitions(bucket_b, num_buckets_b);
+ for (const auto& hash_partition : hash_partitions) {
+ table_creator->add_hash_partitions(hash_partition.columns, hash_partition.num_buckets);
}
- table_creator->set_range_partition_columns(range_cols);
- // Compute split points.
vector<const KuduPartialRow*> split_rows;
- int increment = kNumRows / num_splits;
- for (int i = 1; i < num_splits; i++) {
+ for (const vector<int32_t> split : range_partition.splits) {
KuduPartialRow* row = schema.NewRow();
- for (int j = 0; j < range_cols.size(); j++) {
- const string& range_col = range_cols[j];
- if (j == 0) {
- // Set the first component of the range to a set increment.
- ASSERT_OK(row->SetInt32(range_col, increment * i));
- } else {
- ASSERT_OK(row->SetInt32(range_col, random_.Next32()));
- }
+ for (int i = 0; i < split.size(); i++) {
+ ASSERT_OK(row->SetInt32(range_partition.columns[i], split[i]));
}
split_rows.push_back(row);
}
+
+ table_creator->set_range_partition_columns(range_partition.columns);
table_creator->split_rows(split_rows);
- ASSERT_OK(table_creator->Create());
+ for (const auto& bound : range_partition.bounds) {
+ KuduPartialRow* lower = schema.NewRow();
+ KuduPartialRow* upper = schema.NewRow();
+ for (int i = 0; i < bound.first.size(); i++) {
+ ASSERT_OK(lower->SetInt32(range_partition.columns[i], bound.first[i]));
+ }
+ for (int i = 0; i < bound.second.size(); i++) {
+ ASSERT_OK(upper->SetInt32(range_partition.columns[i], bound.second[i]));
+ }
+ table_creator->add_range_bound(lower, upper);
+ }
+
+ ASSERT_OK(table_creator->Create());
ASSERT_OK(client_->OpenTable(kTableName, &table_));
+ ASSERT_EQ(NumPartitions(hash_partitions, range_partition), CountTablets());
+ }
+
+ void DeleteTable() {
+ inserted_rows_.clear();
+ client_->DeleteTable(table_->name());
+ table_.reset();
}
int CountTablets() {
- vector<tserver::ListTabletsResponsePB_StatusAndSchemaPB> tablets;
- CHECK_OK(ListTablets(ts_map_.begin()->second, MonoDelta::FromSeconds(10), &tablets));
- return tablets.size();
+ GetTableLocationsRequestPB req;
+ GetTableLocationsResponsePB resp;
+ RpcController controller;
+ req.mutable_table()->set_table_name(kTableName);
+ req.set_max_returned_locations(100);
+
+ for (int i = 1; ; i++) {
+ CHECK_LE(i, 10) << "CountTablets timed out";
+
+ controller.Reset();
+ CHECK_OK(cluster_->master_proxy()->GetTableLocations(req, &resp, &controller));
+
+ if (resp.has_error()) {
+ CHECK_EQ(MasterErrorPB::TABLET_NOT_RUNNING, resp.error().code());
+ SleepFor(MonoDelta::FromMilliseconds(i * i * 100));
+ } else {
+ return resp.tablet_locations().size();
+ }
+ }
}
- // Insert 'kNumRows' rows into the given table. The first column 'c0' is ascending,
- // but the rest are random int32s.
- Status InsertRandomRows();
+ // Insert rows into the given table. The first column 'c0' is ascending,
+ // but the rest are random int32s. A single row will be inserted for each
+ // unique c0 value in the range bounds. If there are no range bounds, then
+ // c0 values [0, 1000) will be used. The number of inserted rows is returned
+ // in 'row_count'.
+ Status InsertRows(const RangePartitionOptions& range_partition, int* row_count);
// Perform a scan with a predicate on 'col_name' BETWEEN 'lower' AND 'upper'.
// Verifies that the results match up with applying the same scan against our
@@ -175,35 +277,47 @@ class FlexPartitioningITest : public KuduTest {
// Inserts data into the table, then performs a number of scans to verify that
// the data can be retrieved.
- void InsertAndVerifyScans();
+ void InsertAndVerifyScans(const RangePartitionOptions& range_partition);
Random random_;
gscoped_ptr<ExternalMiniCluster> cluster_;
- unordered_map<string, TServerDetails*> ts_map_;
shared_ptr<KuduClient> client_;
shared_ptr<KuduTable> table_;
- vector<KuduPartialRow*> inserted_rows_;
+ vector<unique_ptr<KuduPartialRow>> inserted_rows_;
};
-Status FlexPartitioningITest::InsertRandomRows() {
+Status FlexPartitioningITest::InsertRows(const RangePartitionOptions& range_partition,
+ int* row_count) {
+ static const vector<pair<vector<int32_t>, vector<int32_t>>> kDefaultBounds =
+ {{ { 0 }, { 1000 } }};
CHECK(inserted_rows_.empty());
+ const vector<pair<vector<int32_t>, vector<int32_t>>>& bounds =
+ range_partition.bounds.empty() ? kDefaultBounds : range_partition.bounds;
+
shared_ptr<KuduSession> session(client_->NewSession());
session->SetTimeoutMillis(10000);
RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
- for (uint64_t i = 0; i < kNumRows; i++) {
- gscoped_ptr<KuduInsert> insert(table_->NewInsert());
- tools::GenerateDataForRow(table_->schema(), i, &random_, insert->mutable_row());
- inserted_rows_.push_back(new KuduPartialRow(*insert->mutable_row()));
- RETURN_NOT_OK(session->Apply(insert.release()));
-
- if (i > 0 && i % 1000 == 0) {
- RETURN_NOT_OK(session->Flush());
+
+ int count = 0;
+ for (const auto& bound : bounds) {
+ for (int32_t i = bound.first[0]; i < bound.second[0]; i++) {
+ gscoped_ptr<KuduInsert> insert(table_->NewInsert());
+ tools::GenerateDataForRow(table_->schema(), i, &random_, insert->mutable_row());
+ inserted_rows_.emplace_back(new KuduPartialRow(*insert->mutable_row()));
+ RETURN_NOT_OK(session->Apply(insert.release()));
+ count++;
+
+ if (i > 0 && i % 1000 == 0) {
+ RETURN_NOT_OK(session->Flush());
+ }
}
}
+
RETURN_NOT_OK(session->Flush());
+ *row_count = count;
return Status::OK();
}
@@ -221,7 +335,7 @@ void FlexPartitioningITest::CheckScanWithColumnPredicate(Slice col_name, int low
// Manually evaluate the predicate against the data we think we inserted.
vector<string> expected_rows;
- for (const KuduPartialRow* row : inserted_rows_) {
+ for (auto& row : inserted_rows_) {
int32_t val;
CHECK_OK(row->GetInt32(col_name, &val));
if (val >= lower && val <= upper) {
@@ -254,7 +368,7 @@ void FlexPartitioningITest::CheckPKRangeScan(int lower, int upper) {
}
void FlexPartitioningITest::CheckPartitionKeyRangeScan() {
- master::GetTableLocationsResponsePB table_locations;
+ GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(),
table_->name(),
MonoDelta::FromSeconds(32),
@@ -277,7 +391,7 @@ void FlexPartitioningITest::CheckPartitionKeyRangeScan() {
std::sort(rows.begin(), rows.end());
vector<string> expected_rows;
- for (KuduPartialRow* row : inserted_rows_) {
+ for (auto& row : inserted_rows_) {
expected_rows.push_back("(" + row->ToString() + ")");
}
std::sort(expected_rows.begin(), expected_rows.end());
@@ -287,7 +401,7 @@ void FlexPartitioningITest::CheckPartitionKeyRangeScan() {
}
void FlexPartitioningITest::CheckPartitionKeyRangeScanWithPKRange(int lower, int upper) {
- master::GetTableLocationsResponsePB table_locations;
+ GetTableLocationsResponsePB table_locations;
ASSERT_OK(GetTableLocations(cluster_->master_proxy(),
table_->name(),
MonoDelta::FromSeconds(32),
@@ -321,15 +435,16 @@ void FlexPartitioningITest::CheckPartitionKeyRangeScanWithPKRange(int lower, int
ASSERT_EQ(rows, expected_rows);
}
-void FlexPartitioningITest::InsertAndVerifyScans() {
- ASSERT_OK(InsertRandomRows());
+void FlexPartitioningITest::InsertAndVerifyScans(const RangePartitionOptions& range_partition) {
+ int row_count;
+ ASSERT_OK(InsertRows(range_partition, &row_count));
// First, ensure that we get back the same number we put in.
{
vector<string> rows;
ScanTableToStrings(table_.get(), &rows);
std::sort(rows.begin(), rows.end());
- ASSERT_EQ(kNumRows, rows.size());
+ ASSERT_EQ(row_count, rows.size());
}
// Perform some scans with predicates.
@@ -394,178 +509,48 @@ void FlexPartitioningITest::InsertAndVerifyScans() {
}
}
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (c0, c1),
-// );
-TEST_F(FlexPartitioningITest, TestSimplePartitioning) {
- NO_FATALS(CreateTable(1, // 2 columns
- vector<string>(), 0, // No hash buckets
- vector<string>(), 0, // No hash buckets
- { "c0" }, // no range partitioning
- 2)); // 1 split;
- ASSERT_EQ(2, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32 PRIMARY KEY,
-// BUCKET BY (c0) INTO 3 BUCKETS
-// );
-TEST_F(FlexPartitioningITest, TestSinglePKBucketed) {
- NO_FATALS(CreateTable(1, // 1 column
- { "c0" }, 3, // bucket by "c0" in 3 buckets
- vector<string>(), 0, // no other buckets
- { "c0" }, // default range
- 2)); // one split
- ASSERT_EQ(6, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// BUCKET BY (c1) INTO 3 BUCKETS
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_BucketOnSecondColumn) {
- NO_FATALS(CreateTable(2, // 2 columns
- { "c1" }, 3, // bucket by "c0" in 3 buckets
- vector<string>(), 0, // no other buckets
- { "c0", "c1" }, // default range
- 1)); // no splits;
- ASSERT_EQ(3, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (c1, c0)
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByReversedPK) {
- NO_FATALS(CreateTable(2, // 2 columns
- vector<string>(), 0, // no buckets
- vector<string>(), 0, // no buckets
- { "c1", "c0" }, // range partition by reversed PK
- 2)); // one split
- ASSERT_EQ(2, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (c0)
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByPKPrefix) {
- NO_FATALS(CreateTable(2, // 2 columns
- vector<string>(), 0, // no buckets
- vector<string>(), 0, // no buckets
- { "c0" }, // range partition by c0
- 2)); // one split
- ASSERT_EQ(2, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (c1)
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_RangePartitionByPKSuffix) {
- NO_FATALS(CreateTable(2, // 2 columns
- vector<string>(), 0, // no buckets
- vector<string>(), 0, // no buckets
- { "c1" }, // range partition by c1
- 2)); // one split
- ASSERT_EQ(2, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (c0),
-// BUCKET BY (c1) INTO 4 BUCKETS
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_RangeAndBucket) {
- NO_FATALS(CreateTable(2, // 2 columns
- { "c1" }, 4, // BUCKET BY c1 INTO 4 BUCKETS
- vector<string>(), 0, // no buckets
- { "c0" }, // range partition by c0
- 2)); // 1 split;
- ASSERT_EQ(8, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// BUCKET BY (c1) INTO 4 BUCKETS,
-// BUCKET BY (c0) INTO 3 BUCKETS
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_MultipleBucketings) {
- NO_FATALS(CreateTable(2, // 2 columns
- { "c1" }, 4, // BUCKET BY c1 INTO 4 BUCKETS
- { "c0" }, 3, // BUCKET BY c0 INTO 3 BUCKETS
- { "c0", "c1" }, // default range partitioning
- 2)); // 1 split;
- ASSERT_EQ(4 * 3 * 2, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (),
-// BUCKET BY (c0) INTO 4 BUCKETS,
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_SingleBucketNoRange) {
- NO_FATALS(CreateTable(2, // 2 columns
- { "c0" }, 4, // BUCKET BY c0 INTO 4 BUCKETS
- vector<string>(), 0, // no buckets
- vector<string>(), // no range partitioning
- 1)); // 0 splits;
- ASSERT_EQ(4, CountTablets());
-
- InsertAndVerifyScans();
-}
-
-// CREATE TABLE t (
-// c0 INT32,
-// c1 INT32,
-// PRIMARY KEY (c0, c1)
-// RANGE PARTITION BY (),
-// BUCKET BY (c0) INTO 4 BUCKETS,
-// BUCKET BY (c1) INTO 5 BUCKETS,
-// );
-TEST_F(FlexPartitioningITest, TestCompositePK_MultipleBucketingsNoRange) {
- NO_FATALS(CreateTable(2, // 2 columns
- { "c0" }, 4, // BUCKET BY c0 INTO 4 BUCKETS
- { "c1" }, 5, // BUCKET BY c1 INTO 5 BUCKETS
- vector<string>(), // no range partitioning
- 1)); // 0 splits;
- ASSERT_EQ(20, CountTablets());
-
- InsertAndVerifyScans();
+TEST_F(FlexPartitioningITest, TestFlexPartitioning) {
+ vector<vector<HashPartitionOptions>> hash_options {
+ // No hash partitioning
+ {},
+ // HASH (c1) INTO 4 BUCKETS
+ { HashPartitionOptions { { "c1" }, 4 } },
+ // HASH (c0, c1) INTO 3 BUCKETS
+ { HashPartitionOptions { { "c0", "c1" }, 3 } },
+ // HASH (c1, c0) INTO 3 BUCKETS, HASH (c2) INTO 3 BUCKETS
+ { HashPartitionOptions { { "c1", "c0" }, 3 },
+ HashPartitionOptions { { "c2" }, 3 } },
+ // HASH (c2) INTO 2 BUCKETS, HASH (c1) INTO 2 BUCKETS, HASH (c0) INTO 2 BUCKETS
+ { HashPartitionOptions { { "c2" }, 2 },
+ HashPartitionOptions { { "c1" }, 2 },
+ HashPartitionOptions { { "c0" }, 2 } },
+ };
+
+ vector<RangePartitionOptions> range_options {
+ // No range partitioning
+ RangePartitionOptions { {}, {}, {} },
+ // RANGE (c0)
+ RangePartitionOptions { { "c0" }, { }, { } },
+ // RANGE (c0) SPLIT ROWS (500)
+ RangePartitionOptions { { "c0" }, { { 500 } }, { } },
+ // RANGE (c2, c1) SPLIT ROWS (500, 0), (500, 500), (1000, 0)
+ RangePartitionOptions { { "c2", "c1" }, { { 500, 0 }, { 500, 500 }, { 1000, 0 } }, { } },
+ // RANGE (c0) BOUNDS ((0), (500)), ((500), (1000))
+ RangePartitionOptions { { "c0" }, { }, { { { 0 }, { 500 } }, { { 500 }, { 1000 } } } },
+ // RANGE (c0) SPLIT ROWS (500) BOUNDS ((0), (1000))
+ RangePartitionOptions { { "c0" }, { }, { { { 0 }, { 500 } }, { { 500 }, { 1000 } } } },
+ // RANGE (c0, c1) SPLIT ROWS (500), (2001), (2500), (2999)
+ // BOUNDS ((0), (1000)), ((2000), (3000))
+ RangePartitionOptions{ { "c0", "c1" }, { { 500 }, { 2001 }, { 2500 }, { 2999 } },
+ { { { 0 }, { 1000 } }, { { 2000 }, { 3000 } } } },
+ };
+
+ for (const auto& hash_option : hash_options) {
+ for (const auto& range_option: range_options) {
+ SCOPED_TRACE(PartitionOptionsToString(hash_option, range_option));
+ NO_FATALS(TestPartitionOptions(hash_option, range_option));
+ }
+ }
}
-
} // namespace itest
} // namespace kudu