You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/03/04 00:03:20 UTC
[kudu] 01/02: KUDU-3197 [tserver] optimal Schema's memory used, using std::shared_ptr
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f4b6d8917b79b9de53957174ade1a7ffc76e0090
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Tue Jan 4 20:07:29 2022 +0800
KUDU-3197 [tserver] optimal Schema's memory used, using std::shared_ptr
Change TabletMeta's variable Schema* to std::shared_ptr<Schema>
to reduce memory used when alter schema.
Because TabletMeta save old_schemas to reserve the elder schemas
when alter schema, maybe they have been used by scanners or
compaction jobs. As jira KUDU-3197 said, frequently alter schema will
lead to tserver's memory becomes very large, just like memory leak,
especially column's number is very large.
The jira issued by wangningito, and I continue his work, and
now use std::shared_ptr instead of scoped_refptr<Schema>, because
scoped_refptr<Schema> causes too many changes, just as:
https://gerrit.cloudera.org/c/18098/
Change-Id: Ic284dde108c49130419d876c6698b40c195e9b35
Reviewed-on: http://gerrit.cloudera.org:8080/18255
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/client/client-test.cc | 10 ++---
src/kudu/common/schema.h | 1 +
src/kudu/integration-tests/linked_list-test-util.h | 2 +-
src/kudu/master/sys_catalog.cc | 5 ++-
src/kudu/tablet/all_types-scan-correctness-test.cc | 2 +-
src/kudu/tablet/cfile_set.cc | 4 +-
src/kudu/tablet/cfile_set.h | 2 +-
src/kudu/tablet/diff_scan-test.cc | 6 +--
src/kudu/tablet/diskrowset.cc | 8 ++--
src/kudu/tablet/mt-tablet-test.cc | 2 +-
src/kudu/tablet/ops/alter_schema_op.cc | 10 ++---
src/kudu/tablet/ops/alter_schema_op.h | 8 ++--
src/kudu/tablet/ops/op.h | 4 +-
src/kudu/tablet/ops/write_op.h | 7 ++-
src/kudu/tablet/rowset_metadata.h | 2 +-
src/kudu/tablet/tablet-schema-test.cc | 10 ++---
src/kudu/tablet/tablet-test-util.h | 3 +-
src/kudu/tablet/tablet.cc | 47 +++++++++++---------
src/kudu/tablet/tablet.h | 6 +--
src/kudu/tablet/tablet_bootstrap.cc | 8 ++--
src/kudu/tablet/tablet_metadata.cc | 51 +++++++++++-----------
src/kudu/tablet/tablet_metadata.h | 29 +++++-------
src/kudu/tablet/tablet_replica-test.cc | 10 ++---
src/kudu/tools/kudu-tool-test.cc | 8 ++--
src/kudu/tools/tool_action_fs.cc | 8 ++--
src/kudu/tools/tool_action_local_replica.cc | 15 ++++---
src/kudu/tserver/scanners.cc | 5 ++-
src/kudu/tserver/tablet_server-test.cc | 2 +-
src/kudu/tserver/tablet_service.cc | 23 ++++++----
src/kudu/tserver/tserver_path_handlers.cc | 12 ++---
30 files changed, 161 insertions(+), 149 deletions(-)
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 94ec37d..75b2bce 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4709,7 +4709,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
->Default(KuduValue::CopyString("hello!"));
ASSERT_OK(table_alterer->Alter());
ASSERT_EQ(4, tablet_replica->tablet()->metadata()->schema_version());
- Schema schema = tablet_replica->tablet()->metadata()->schema();
+ Schema schema = *tablet_replica->tablet()->metadata()->schema();
ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
ASSERT_FALSE(col_schema.has_read_default());
ASSERT_TRUE(col_schema.has_write_default());
@@ -4723,7 +4723,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
->Default(KuduValue::FromInt(54321));
ASSERT_OK(table_alterer->Alter());
ASSERT_EQ(5, tablet_replica->tablet()->metadata()->schema_version());
- Schema schema = tablet_replica->tablet()->metadata()->schema();
+ Schema schema = *tablet_replica->tablet()->metadata()->schema();
ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
ASSERT_TRUE(col_schema.has_read_default()); // Started with a default
ASSERT_TRUE(col_schema.has_write_default());
@@ -4737,7 +4737,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
->RemoveDefault();
ASSERT_OK(table_alterer->Alter());
ASSERT_EQ(6, tablet_replica->tablet()->metadata()->schema_version());
- Schema schema = tablet_replica->tablet()->metadata()->schema();
+ Schema schema = *tablet_replica->tablet()->metadata()->schema();
ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
ASSERT_FALSE(col_schema.has_read_default());
ASSERT_FALSE(col_schema.has_write_default());
@@ -4750,7 +4750,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
->RemoveDefault();
ASSERT_OK(table_alterer->Alter());
ASSERT_EQ(7, tablet_replica->tablet()->metadata()->schema_version());
- Schema schema = tablet_replica->tablet()->metadata()->schema();
+ Schema schema = *tablet_replica->tablet()->metadata()->schema();
ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
ASSERT_TRUE(col_schema.has_read_default());
ASSERT_FALSE(col_schema.has_write_default());
@@ -4779,7 +4779,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
->BlockSize(16 * 1024 * 1024);
ASSERT_OK(table_alterer->Alter());
ASSERT_EQ(8, tablet_replica->tablet()->metadata()->schema_version());
- Schema schema = tablet_replica->tablet()->metadata()->schema();
+ Schema schema = *tablet_replica->tablet()->metadata()->schema();
ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
ASSERT_EQ(KuduColumnStorageAttributes::PLAIN_ENCODING, col_schema.attributes().encoding);
ASSERT_EQ(KuduColumnStorageAttributes::LZ4, col_schema.attributes().compression);
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 725a0e8..6662319 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -48,6 +48,7 @@
namespace kudu {
class Schema;
+typedef std::shared_ptr<Schema> SchemaPtr;
} // namespace kudu
// Check that two schemas are equal, yielding a useful error message
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 8e395df..89eae43 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -564,7 +564,7 @@ inline Status LinkedListTester::VerifyLinkedListLocal(
GenerateSplitInts());
verifier.StartScanTimer();
- const Schema* tablet_schema = tablet->schema();
+ const Schema* tablet_schema = tablet->schema().get();
// Cannot use schemas with col indexes in a scan (assertions fire).
Schema projection(tablet_schema->columns(), tablet_schema->num_key_columns());
std::unique_ptr<RowwiseIterator> iter;
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 0d417a5..5b61cd7 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -252,9 +252,10 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
RETURN_NOT_OK(tablet::TabletMetadata::Load(fs_manager, kSysCatalogTabletId, &metadata));
// Verify that the schema is the current one
- if (metadata->schema() != BuildTableSchema()) {
+ const SchemaPtr schema_ptr = metadata->schema();
+ if (*schema_ptr != BuildTableSchema()) {
// TODO: In this case we probably should execute the migration step.
- return(Status::Corruption("Unexpected schema", metadata->schema().ToString()));
+ return(Status::Corruption("Unexpected schema", schema_ptr->ToString()));
}
LOG(INFO) << "Verifying existing consensus state";
diff --git a/src/kudu/tablet/all_types-scan-correctness-test.cc b/src/kudu/tablet/all_types-scan-correctness-test.cc
index 7dfa1d6..5340e04 100644
--- a/src/kudu/tablet/all_types-scan-correctness-test.cc
+++ b/src/kudu/tablet/all_types-scan-correctness-test.cc
@@ -363,7 +363,7 @@ public:
} else {
default_ptr = rowops_.GenerateElement(read_default);
}
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
builder.RemoveColumn("val_c");
ASSERT_OK(builder.AddColumn("val_c", rowops_.type_, true, default_ptr, nullptr));
AlterSchema(builder.Build());
diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc
index f9899e4..a9efaf0 100644
--- a/src/kudu/tablet/cfile_set.cc
+++ b/src/kudu/tablet/cfile_set.cc
@@ -220,7 +220,7 @@ CFileReader* CFileSet::key_index_reader() const {
// If there is no special index cfile, then we have a non-compound key
// and we can just use the key column.
// This is always the first column listed in the tablet schema.
- int key_col_id = tablet_schema().column_id(0);
+ int key_col_id = tablet_schema()->column_id(0);
return FindOrDie(readers_by_col_id_, key_col_id).get();
}
@@ -423,7 +423,7 @@ Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) {
Schema key_schema_for_vlog;
if (VLOG_IS_ON(1)) {
- key_schema_for_vlog = base_data_->tablet_schema().CreateKeyProjection();
+ key_schema_for_vlog = base_data_->tablet_schema()->CreateKeyProjection();
}
const auto* lb_key = spec->lower_bound_key();
diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h
index 1b06df0..34eb4b3 100644
--- a/src/kudu/tablet/cfile_set.h
+++ b/src/kudu/tablet/cfile_set.h
@@ -156,7 +156,7 @@ class CFileSet :
// (the ad-hoc reader for composite keys, otherwise the key column reader)
cfile::CFileReader* key_index_reader() const;
- const Schema &tablet_schema() const { return rowset_metadata_->tablet_schema(); }
+ const SchemaPtr tablet_schema() const { return rowset_metadata_->tablet_schema(); }
std::shared_ptr<RowSetMetadata> rowset_metadata_;
std::shared_ptr<MemTracker> bloomfile_tracker_;
diff --git a/src/kudu/tablet/diff_scan-test.cc b/src/kudu/tablet/diff_scan-test.cc
index cecc367..6dded93 100644
--- a/src/kudu/tablet/diff_scan-test.cc
+++ b/src/kudu/tablet/diff_scan-test.cc
@@ -104,7 +104,7 @@ TEST_P(DiffScanTest, TestDiffScan) {
opts.include_deleted_rows = include_deleted_rows;
static const bool kIsDeletedDefault = false;
- SchemaBuilder builder(tablet->metadata()->schema());
+ SchemaBuilder builder(*tablet->metadata()->schema());
if (order_mode == ORDERED) {
// Define our diff scan to start from snap1.
// NOTE: it isn't critical to set this given the default is -Inf, but it
@@ -188,7 +188,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestKudu3108) {
opts.order = ORDERED;
opts.include_deleted_rows = true;
static const bool kIsDeletedDefault = false;
- SchemaBuilder builder(tablet->metadata()->schema());
+ SchemaBuilder builder(*tablet->metadata()->schema());
ASSERT_OK(builder.AddColumn("deleted", IS_DELETED,
/*is_nullable=*/ false,
/*read_default=*/ &kIsDeletedDefault,
@@ -247,7 +247,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestDiffScanAfterDeltaFlushRacesWithBatch
opts.snap_to_exclude = snap1;
opts.snap_to_include = snap2;
opts.order = ORDERED;;
- SchemaBuilder builder(tablet->metadata()->schema());
+ SchemaBuilder builder(*tablet->metadata()->schema());
Schema projection = builder.BuildWithoutIds();
opts.projection = &projection;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index da2ae7a..13c6d06 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -630,10 +630,10 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
DCHECK(open_);
shared_lock<rw_spinlock> l(component_lock_);
- const Schema* schema = &rowset_metadata_->tablet_schema();
+ const SchemaPtr schema_ptr = rowset_metadata_->tablet_schema();
RowIteratorOptions opts;
- opts.projection = schema;
+ opts.projection = schema_ptr.get();
opts.io_context = io_context;
vector<shared_ptr<DeltaStore>> included_stores;
unique_ptr<DeltaIterator> delta_iter;
@@ -641,7 +641,7 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
opts, REDO, &included_stores, &delta_iter));
out->reset(new MajorDeltaCompaction(rowset_metadata_->fs_manager(),
- *schema,
+ *schema_ptr,
base_data_.get(),
std::move(delta_iter),
std::move(included_stores),
@@ -910,7 +910,7 @@ Status DiskRowSet::DebugDump(vector<string> *lines) {
// Using CompactionInput to dump our data is an easy way of seeing all the
// rows and deltas.
unique_ptr<CompactionInput> input;
- RETURN_NOT_OK(NewCompactionInput(&rowset_metadata_->tablet_schema(),
+ RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
MvccSnapshot::CreateSnapshotIncludingAllOps(),
nullptr, &input));
return DebugDumpCompactionInput(input.get(), lines);
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 937f4a0..05f29b3 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -107,7 +107,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
uint64_t count;
CHECK_OK(tablet()->CountRows(&count));
- const Schema* schema = tablet()->schema();
+ const Schema* schema = tablet()->schema().get();
ColumnSchema valcol = schema->column(schema->find_column("val"));
valcol_projection_ = Schema({ valcol }, 0);
CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
diff --git a/src/kudu/tablet/ops/alter_schema_op.cc b/src/kudu/tablet/ops/alter_schema_op.cc
index 1c73ea2..5e386d3 100644
--- a/src/kudu/tablet/ops/alter_schema_op.cc
+++ b/src/kudu/tablet/ops/alter_schema_op.cc
@@ -96,17 +96,17 @@ Status AlterSchemaOp::Prepare() {
TRACE("PREPARE ALTER-SCHEMA: Starting");
// Decode schema
- unique_ptr<Schema> schema(new Schema);
- Status s = SchemaFromPB(state_->request()->schema(), schema.get());
+ SchemaPtr schema_ptr = std::make_shared<Schema>();
+ Status s = SchemaFromPB(state_->request()->schema(), schema_ptr.get());
if (!s.ok()) {
state_->completion_callback()->set_error(s, TabletServerErrorPB::INVALID_SCHEMA);
return s;
}
Tablet* tablet = state_->tablet_replica()->tablet();
- RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema.get()));
+ RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema_ptr));
- state_->AddToAutoReleasePool(std::move(schema));
+ state_->AddToAutoReleasePool(std::move(schema_ptr));
TRACE("PREPARE ALTER-SCHEMA: finished");
return s;
@@ -138,7 +138,7 @@ Status AlterSchemaOp::Apply(CommitMsg** commit_msg) {
}
state_->tablet_replica()->log()
- ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
+ ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema().get()),
state_->schema_version());
// Altered tablets should be included in the next tserver heartbeat so that
diff --git a/src/kudu/tablet/ops/alter_schema_op.h b/src/kudu/tablet/ops/alter_schema_op.h
index 9bccd2a..022544e 100644
--- a/src/kudu/tablet/ops/alter_schema_op.h
+++ b/src/kudu/tablet/ops/alter_schema_op.h
@@ -24,6 +24,7 @@
#include <boost/optional/optional.hpp>
#include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/tablet/ops/op.h"
@@ -32,7 +33,6 @@
namespace kudu {
-class Schema;
class rw_semaphore;
namespace tablet {
@@ -59,8 +59,8 @@ class AlterSchemaOpState : public OpState {
const tserver::AlterSchemaRequestPB* request() const override { return request_; }
tserver::AlterSchemaResponsePB* response() const override { return response_; }
- void set_schema(const Schema* schema) { schema_ = schema; }
- const Schema* schema() const { return schema_; }
+ void set_schema(const SchemaPtr& schema) { schema_ = schema; }
+ const SchemaPtr schema() const { return schema_; }
std::string new_table_name() const {
return request_->new_table_name();
@@ -109,7 +109,7 @@ class AlterSchemaOpState : public OpState {
DISALLOW_COPY_AND_ASSIGN(AlterSchemaOpState);
// The new (target) Schema.
- const Schema* schema_;
+ SchemaPtr schema_;
// The original RPC request and response.
const tserver::AlterSchemaRequestPB *request_;
diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h
index c94837c..024b53d 100644
--- a/src/kudu/tablet/ops/op.h
+++ b/src/kudu/tablet/ops/op.h
@@ -209,7 +209,7 @@ class OpState {
}
// Sets a heap object to be managed by this op's AutoReleasePool.
- void AddToAutoReleasePool(std::unique_ptr<Schema> t) {
+ void AddToAutoReleasePool(SchemaPtr t) {
schemas_pool_.emplace_back(std::move(t));
}
@@ -283,7 +283,7 @@ class OpState {
// Optional callback to be called once the op completes.
std::unique_ptr<OpCompletionCallback> completion_clbk_;
- std::deque<std::unique_ptr<Schema>> schemas_pool_;
+ std::deque<SchemaPtr> schemas_pool_;
// This operation's timestamp.
// This is only set once during the operation lifecycle, using external synchronization.
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index 8c14176..4751d39 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -193,9 +193,10 @@ class WriteOpState : public OpState {
void ReleaseMvccTxn(Op::OpResult result);
- void set_schema_at_decode_time(const Schema* schema) {
+ void set_schema_at_decode_time(const SchemaPtr& schema) {
std::lock_guard<simple_spinlock> l(op_state_lock_);
- schema_at_decode_time_ = schema;
+ schema_ptr_at_decode_time_ = schema;
+ schema_at_decode_time_ = schema.get();
}
const Schema* schema_at_decode_time() const {
@@ -319,6 +320,8 @@ class WriteOpState : public OpState {
// at APPLY time to ensure we don't have races against schema change.
// Protected by op_state_lock_.
const Schema* schema_at_decode_time_;
+ // protect schema_at_decode_time_
+ SchemaPtr schema_ptr_at_decode_time_;
// Lock that protects access to various fields of WriteOpState.
mutable simple_spinlock op_state_lock_;
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index c4be77d..70a6b1f 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -99,7 +99,7 @@ class RowSetMetadata {
int64_t id() const { return id_; }
- const Schema& tablet_schema() const {
+ const SchemaPtr tablet_schema() const {
return tablet_metadata_->schema();
}
diff --git a/src/kudu/tablet/tablet-schema-test.cc b/src/kudu/tablet/tablet-schema-test.cc
index de26638..1886f81 100644
--- a/src/kudu/tablet/tablet-schema-test.cc
+++ b/src/kudu/tablet/tablet-schema-test.cc
@@ -146,7 +146,7 @@ TEST_F(TestTabletSchema, TestWrite) {
const int32_t c2_write_default = 5;
const int32_t c2_read_default = 7;
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default));
AlterSchema(builder.Build());
Schema s2 = builder.BuildWithoutIds();
@@ -189,7 +189,7 @@ TEST_F(TestTabletSchema, TestReInsert) {
const int32_t c2_write_default = 5;
const int32_t c2_read_default = 7;
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default));
AlterSchema(builder.Build());
Schema s2 = builder.BuildWithoutIds();
@@ -219,7 +219,7 @@ TEST_F(TestTabletSchema, TestRenameProjection) {
InsertRow(client_schema_, 1);
// Switch schema to s2
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.RenameColumn("c1", "c1_renamed"));
AlterSchema(builder.Build());
Schema s2 = builder.BuildWithoutIds();
@@ -260,7 +260,7 @@ TEST_F(TestTabletSchema, TestDeleteAndReAddColumn) {
VerifyTabletRows(client_schema_, keys);
// Switch schema to s2
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.RemoveColumn("c1"));
// NOTE this new 'c1' will have a different id from the previous one
// so the data added to the previous 'c1' will not be visible.
@@ -279,7 +279,7 @@ TEST_F(TestTabletSchema, TestModifyEmptyMemRowSet) {
std::vector<std::pair<string, string> > keys;
// Switch schema to s2
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddNullableColumn("c2", INT32));
AlterSchema(builder.Build());
Schema s2 = builder.BuildWithoutIds();
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 1ca5a1a..bf010cb 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -150,8 +150,9 @@ class KuduTabletTest : public KuduTest {
*(req.mutable_new_extra_config()) = *extra_config;
}
+ SchemaPtr schema_ptr = std::make_shared<Schema>(schema);
AlterSchemaOpState op_state(nullptr, &req, nullptr);
- ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, &schema));
+ ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, schema_ptr));
ASSERT_OK(tablet()->AlterSchema(&op_state));
op_state.Finish();
}
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 28bb0cf..50eda43 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -262,7 +262,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
shared_ptr<MemTracker> parent_mem_tracker,
MetricRegistry* metric_registry,
scoped_refptr<LogAnchorRegistry> log_anchor_registry)
- : key_schema_(metadata->schema().CreateKeyProjection()),
+ : key_schema_(metadata->schema()->CreateKeyProjection()),
metadata_(std::move(metadata)),
log_anchor_registry_(std::move(log_anchor_registry)),
mem_trackers_(tablet_id(), std::move(parent_mem_tracker)),
@@ -371,7 +371,8 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
// Now that the current state is loaded, create the new MemRowSet with the next id.
shared_ptr<MemRowSet> new_mrs;
- RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
+ const SchemaPtr schema_ptr = schema();
+ RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr,
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
@@ -386,7 +387,7 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
// NOTE: we are able to FindOrDie() on these IDs because
// 'txn_ids_with_mrs' is a subset of the transaction IDs known by the
// metadata.
- RETURN_NOT_OK(MemRowSet::Create(0, *schema(), txn_id, FindOrDie(txn_meta_by_id, txn_id),
+ RETURN_NOT_OK(MemRowSet::Create(0, *schema_ptr, txn_id, FindOrDie(txn_meta_by_id, txn_id),
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&txn_mrs));
@@ -462,7 +463,7 @@ void Tablet::Shutdown() {
Status Tablet::GetMappedReadProjection(const Schema& projection,
Schema *mapped_projection) const {
- const Schema* cur_schema = schema();
+ const SchemaPtr cur_schema = schema();
return cur_schema->GetMappedReadProjection(projection, mapped_projection);
}
@@ -541,17 +542,18 @@ Status Tablet::DecodeWriteOperations(const Schema* client_schema,
TRACE("Decoding operations");
vector<DecodedRowOperation> ops;
+ SchemaPtr schema_ptr = schema();
// Decode the ops
RowOperationsPBDecoder dec(&op_state->request()->row_operations(),
client_schema,
- schema(),
+ schema_ptr.get(),
op_state->arena());
RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
TRACE_COUNTER_INCREMENT("num_ops", ops.size());
// Important to set the schema before the ops -- we need the
// schema in order to stringify the ops.
- op_state->set_schema_at_decode_time(schema());
+ op_state->set_schema_at_decode_time(schema_ptr);
op_state->SetRowOps(std::move(ops));
return Status::OK();
@@ -605,8 +607,8 @@ Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
const auto& ps = metadata_->partition_schema();
if (PREDICT_FALSE(!ps.PartitionContainsRow(metadata_->partition(), row))) {
return Status::NotFound(
- Substitute("Row not in tablet partition. Partition: '$0' row: '$1'",
- ps.PartitionDebugString(metadata_->partition(), *schema()),
+ Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.",
+ ps.PartitionDebugString(metadata_->partition(), *schema().get()),
ps.PartitionKeyDebugString(row)));
}
return Status::OK();
@@ -744,7 +746,7 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context,
}
Timestamp ts = op_state->timestamp();
- ConstContiguousRow row(schema(), op->decoded_op.row_data);
+ ConstContiguousRow row(schema().get(), op->decoded_op.row_data);
// TODO(todd): the Insert() call below will re-encode the key, which is a
// waste. Should pass through the KeyProbe structure perhaps.
@@ -819,7 +821,7 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context,
RowOp* upsert,
RowSet* rowset,
ProbeStats* stats) {
- const auto* schema = this->schema();
+ const auto* schema = this->schema().get();
ConstContiguousRow row(schema, upsert->decoded_op.row_data);
faststring buf;
RowChangeListEncoder enc(&buf);
@@ -991,7 +993,8 @@ void Tablet::StartApplying(ParticipantOpState* op_state) {
void Tablet::CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata> txn_meta) {
shared_ptr<MemRowSet> new_mrs;
- CHECK_OK(MemRowSet::Create(0, *schema(), txn_id, std::move(txn_meta),
+ const SchemaPtr schema_ptr = schema();
+ CHECK_OK(MemRowSet::Create(0, *schema_ptr, txn_id, std::move(txn_meta),
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
@@ -1266,7 +1269,7 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context,
}
DCHECK(op_state != nullptr) << "must have a WriteOpState";
DCHECK(op_state->op_id().IsInitialized()) << "OpState OpId needed for anchoring";
- DCHECK_EQ(op_state->schema_at_decode_time(), schema());
+ DCHECK_EQ(op_state->schema_at_decode_time(), schema().get());
// If we were unable to check rowset presence in batch (e.g. because we are processing
// a batch which contains some duplicate keys) we need to do so now.
@@ -1524,7 +1527,8 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
}
shared_ptr<MemRowSet> new_mrs;
- RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
+ const SchemaPtr schema_ptr = schema();
+ RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr,
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
@@ -1539,7 +1543,7 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
}
Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state,
- const Schema* schema) {
+ const SchemaPtr& schema) {
if (!schema->has_column_ids()) {
// this probably means that the request is not from the Master
@@ -1556,7 +1560,7 @@ Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state,
}
Status Tablet::AlterSchema(AlterSchemaOpState* op_state) {
- DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema())))
+ DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema().get())))
<< "Schema keys cannot be altered(except name)";
// Prevent any concurrent flushes. Otherwise, we run into issues where
@@ -1580,7 +1584,7 @@ Status Tablet::AlterSchema(AlterSchemaOpState* op_state) {
<< " to " << op_state->schema()->ToString()
<< " version " << op_state->schema_version();
DCHECK(schema_lock_.is_locked());
- metadata_->SetSchema(*op_state->schema(), op_state->schema_version());
+ metadata_->SetSchema(op_state->schema(), op_state->schema_version());
if (op_state->has_new_table_name()) {
metadata_->SetTableName(op_state->new_table_name());
if (metric_entity_) {
@@ -1609,7 +1613,8 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
// to flush.
VLOG_WITH_PREFIX(1) << "Rewinding schema during bootstrap to " << new_schema.ToString();
- metadata_->SetSchema(new_schema, schema_version);
+ SchemaPtr schema = std::make_shared<Schema>(new_schema);
+ metadata_->SetSchema(schema, schema_version);
{
std::lock_guard<rw_spinlock> lock(component_lock_);
@@ -1617,7 +1622,7 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
shared_ptr<RowSetTree> old_rowsets = components_->rowsets;
CHECK(old_mrs->empty());
shared_ptr<MemRowSet> new_mrs;
- RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), new_schema,
+ RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), *schema,
log_anchor_registry_.get(),
mem_trackers_.tablet_tracker,
&new_mrs));
@@ -1872,7 +1877,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
}
shared_ptr<CompactionInput> merge;
- RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &io_context, &merge));
+ const SchemaPtr schema_ptr = schema();
+ RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(), &io_context, &merge));
RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(),
compaction_policy_->target_rowset_size());
@@ -2014,8 +2020,9 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates "
"which arrived during Phase 1. Snapshot: $1",
op_name, non_duplicated_ops_snap.ToString());
+ const SchemaPtr schema_ptr2 = schema();
RETURN_NOT_OK_PREPEND(
- input.CreateCompactionInput(non_duplicated_ops_snap, schema(), &io_context, &merge),
+ input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(), &io_context, &merge),
Substitute("Failed to create $0 inputs", op_name).c_str());
// Update the output rowsets with the deltas that came in in phase 1, before we swapped
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 4d72cb9..6298c6c 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -261,7 +261,7 @@ class Tablet {
// An error will be returned if the specified schema is invalid (e.g.
// key mismatch, or missing IDs)
Status CreatePreparedAlterSchema(AlterSchemaOpState *op_state,
- const Schema* schema);
+ const SchemaPtr& schema);
// Apply the Schema of the specified op.
// This operation will trigger a flush on the current MemRowSet.
@@ -411,8 +411,8 @@ class Tablet {
// has a very small number of rows.
Status DebugDump(std::vector<std::string> *lines = NULL);
- const Schema* schema() const {
- return &metadata_->schema();
+ const SchemaPtr schema() const {
+ return metadata_->schema();
}
// Returns a reference to the key projection of the tablet schema.
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 7ccedfe..d0922a8 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1535,11 +1535,11 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
// Decode schema
- Schema schema;
- RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
+ SchemaPtr schema_ptr = std::make_shared<Schema>();
+ RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), schema_ptr.get()));
AlterSchemaOpState op_state(nullptr, alter_schema, nullptr);
- RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, &schema));
+ RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, schema_ptr));
// Apply the alter schema to the tablet
RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&op_state), "Failed to AlterSchema:");
@@ -1547,7 +1547,7 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
if (!op_state.error()) {
// If the alter completed successfully, update the log segment header. Note
// that our new log isn't hooked up to the tablet yet.
- log_->SetSchemaForNextLogSegment(std::move(schema), op_state.schema_version());
+ log_->SetSchemaForNextLogSegment(*schema_ptr, op_state.schema_version());
}
return AppendCommitMsg(commit_msg);
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 0a36f1c..1c949cb 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -167,9 +167,10 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager,
scoped_refptr<TabletMetadata>* metadata) {
Status s = Load(fs_manager, tablet_id, metadata);
if (s.ok()) {
- if ((*metadata)->schema() != schema) {
+ const SchemaPtr schema_ptr = (*metadata)->schema();
+ if (*schema_ptr != schema) {
return Status::Corruption(Substitute("Schema on disk ($0) does not "
- "match expected schema ($1)", (*metadata)->schema().ToString(),
+ "match expected schema ($1)", schema_ptr->ToString(),
schema.ToString()));
}
return Status::OK();
@@ -312,8 +313,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
log_prefix_(Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid())),
next_rowset_idx_(0),
last_durable_mrs_id_(kNoDurableMemStore),
- schema_(new Schema(schema)),
- schema_ptr_(schema_.get()),
+ schema_(std::make_shared<Schema>(schema)),
schema_version_(0),
table_name_(std::move(table_name)),
partition_schema_(std::move(partition_schema)),
@@ -339,7 +339,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
tablet_id_(std::move(tablet_id)),
fs_manager_(fs_manager),
next_rowset_idx_(0),
- schema_ptr_(nullptr),
num_flush_pins_(0),
needs_flush_(false),
flush_count_for_tests_(0),
@@ -390,11 +389,14 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
table_name_ = superblock.table_name();
uint32_t schema_version = superblock.schema_version();
- unique_ptr<Schema> schema(new Schema());
+ SchemaPtr schema = std::make_shared<Schema>();
RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock.schema(), schema.get()),
"Failed to parse Schema from superblock " +
SecureShortDebugString(superblock));
- SetSchemaUnlocked(std::move(schema), schema_version);
+ {
+ SchemaPtr old_schema;
+ SwapSchemaUnlocked(schema, schema_version, &old_schema);
+ }
if (!superblock.has_partition()) {
// KUDU-818: Possible backward compatibility issue with tables created
@@ -418,7 +420,7 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
CHECK_EQ(table_id_, superblock.table_id());
PartitionSchema partition_schema;
RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(),
- *schema_ptr_, &partition_schema));
+ *schema_, &partition_schema));
CHECK(partition_schema_ == partition_schema);
Partition partition;
@@ -743,7 +745,7 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
partition_.ToPB(pb.mutable_partition());
pb.set_last_durable_mrs_id(last_durable_mrs_id_);
pb.set_schema_version(schema_version_);
- RETURN_NOT_OK(partition_schema_.ToPB(*schema_ptr_, pb.mutable_partition_schema()));
+ RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema()));
pb.set_table_name(table_name_);
for (const shared_ptr<RowSetMetadata>& meta : rowsets) {
@@ -756,8 +758,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb);
}
- DCHECK(schema_ptr_->has_column_ids());
- RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_ptr_, pb.mutable_schema()),
+ DCHECK(schema_->has_column_ids());
+ RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()),
"Couldn't serialize schema into superblock");
pb.set_tablet_data_state(tablet_data_state_);
@@ -937,22 +939,21 @@ RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) {
return nullptr;
}
-void TabletMetadata::SetSchema(const Schema& schema, uint32_t version) {
- unique_ptr<Schema> new_schema(new Schema(schema));
- std::lock_guard<LockType> l(data_lock_);
- SetSchemaUnlocked(std::move(new_schema), version);
+void TabletMetadata::SetSchema(const SchemaPtr& schema, uint32_t version) {
+ // In case this is the last reference to the schema, destruct the pointer
+ // outside the lock.
+ SchemaPtr old_schema;
+ {
+ std::lock_guard<LockType> l(data_lock_);
+ SwapSchemaUnlocked(schema, version, &old_schema);
+ }
}
-void TabletMetadata::SetSchemaUnlocked(
- unique_ptr<Schema> new_schema, uint32_t version) {
- DCHECK(new_schema->has_column_ids());
-
- // "Release" barrier ensures that, when we publish the new Schema object,
- // all of its initialization is also visible.
- base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&schema_ptr_),
- reinterpret_cast<AtomicWord>(new_schema.get()));
- std::swap(schema_, new_schema);
- old_schemas_.emplace_back(std::move(new_schema));
+void TabletMetadata::SwapSchemaUnlocked(SchemaPtr schema, uint32_t version,
+ SchemaPtr* old_schema) {
+ DCHECK(schema->has_column_ids());
+ *old_schema = std::move(schema_);
+ schema_ = std::move(schema);
schema_version_ = version;
}
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index f545dfa..cda5c85 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <cstdint>
#include <memory>
+#include <mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -29,6 +30,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
+#include "kudu/common/schema.h"
#include "kudu/fs/block_id.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
@@ -43,7 +45,6 @@ namespace kudu {
class BlockIdPB;
class FsManager;
-class Schema;
class Timestamp;
namespace consensus {
@@ -58,6 +59,7 @@ namespace tablet {
class RowSetMetadata;
class TxnMetadata;
+
enum TxnState : int8_t;
typedef std::vector<std::shared_ptr<RowSetMetadata>> RowSetMetadataVector;
@@ -150,19 +152,18 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
uint32_t schema_version() const;
- void SetSchema(const Schema& schema, uint32_t version);
+ void SetSchema(const SchemaPtr& schema, uint32_t version);
void SetTableName(const std::string& table_name);
void SetExtraConfig(TableExtraConfigPB extra_config);
- // Return a reference to the current schema.
+ // Return a scoped_refptr to the current schema.
// This pointer will be valid until the TabletMetadata is destructed,
// even if the schema is changed.
- const Schema& schema() const {
- const Schema* s = reinterpret_cast<const Schema*>(
- base::subtle::Acquire_Load(reinterpret_cast<const AtomicWord*>(&schema_ptr_)));
- return *s;
+ const SchemaPtr schema() const {
+ std::lock_guard<LockType> l(data_lock_);
+ return schema_;
}
// Returns the partition schema of the tablet's table.
@@ -375,7 +376,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
// Constructor for loading an existing tablet.
TabletMetadata(FsManager* fs_manager, std::string tablet_id);
- void SetSchemaUnlocked(std::unique_ptr<Schema> schema, uint32_t version);
+ void SwapSchemaUnlocked(SchemaPtr schema, uint32_t version, SchemaPtr* old_schema);
Status LoadFromDisk();
@@ -443,21 +444,11 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metadata_by_txn_id_;;
// The current schema version.
- std::unique_ptr<Schema> schema_;
- // The raw pointer to the schema for an atomic swap.
- Schema* schema_ptr_;
-
+ SchemaPtr schema_;
uint32_t schema_version_;
std::string table_name_;
PartitionSchema partition_schema_;
- // Previous values of 'schema_'.
- // These are currently kept alive forever, under the assumption that
- // a given tablet won't have thousands of "alter table" calls.
- // They are kept alive so that callers of schema() don't need to
- // worry about reference counting or locking.
- std::vector<std::unique_ptr<Schema>> old_schemas_;
-
// Protected by 'data_lock_'.
BlockIdSet orphaned_blocks_;
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index db077bd..e6980a9 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -32,9 +32,9 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
@@ -540,11 +540,11 @@ TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
- ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+ ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// Add a new column.
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
@@ -581,11 +581,11 @@ TEST_F(TabletReplicaTest, Kudu2690Test) {
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
SchemaPB orig_schema_pb;
- ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+ ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
const int orig_schema_version = tablet()->metadata()->schema_version();
// First things first, add a new column.
- SchemaBuilder builder(tablet()->metadata()->schema());
+ SchemaBuilder builder(*tablet()->metadata()->schema());
ASSERT_OK(builder.AddColumn("new_col", INT32));
Schema new_client_schema = builder.BuildWithoutIds();
SchemaPB new_schema;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 2383b2f..1b73a3b 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2431,7 +2431,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
// Verify the contents of the metadata output
SCOPED_TRACE(stdout);
string debug_str = meta->partition_schema()
- .PartitionDebugString(meta->partition(), meta->schema());
+ .PartitionDebugString(meta->partition(), *meta->schema());
StripWhiteSpace(&debug_str);
ASSERT_STR_CONTAINS(stdout, debug_str);
debug_str = Substitute("Table name: $0 Table id: $1",
@@ -2439,7 +2439,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
ASSERT_STR_CONTAINS(stdout, debug_str);
debug_str = Substitute("Schema (version=$0):", meta->schema_version());
ASSERT_STR_CONTAINS(stdout, debug_str);
- debug_str = meta->schema().ToString();
+ debug_str = meta->schema()->ToString();
StripWhiteSpace(&debug_str);
ASSERT_STR_CONTAINS(stdout, debug_str);
@@ -2583,7 +2583,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
SCOPED_TRACE(stdout);
debug_str = meta->partition_schema()
- .PartitionDebugString(meta->partition(), meta->schema());
+ .PartitionDebugString(meta->partition(), *meta->schema());
StripWhiteSpace(&debug_str);
ASSERT_STR_CONTAINS(stdout, debug_str);
debug_str = Substitute("Table name: $0 Table id: $1",
@@ -2592,7 +2592,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
debug_str = Substitute("Schema (version=$0):", meta->schema_version());
StripWhiteSpace(&debug_str);
ASSERT_STR_CONTAINS(stdout, debug_str);
- debug_str = meta->schema().ToString();
+ debug_str = meta->schema()->ToString();
StripWhiteSpace(&debug_str);
ASSERT_STR_CONTAINS(stdout, debug_str);
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index 12077da..ea530f0 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -549,7 +549,7 @@ string TabletInfo(Field field, const TabletMetadata& tablet) {
case Field::kTabletId: return tablet.tablet_id();
case Field::kPartition: return tablet.partition_schema()
.PartitionDebugString(tablet.partition(),
- tablet.schema());
+ *tablet.schema().get());
default: LOG(FATAL) << "unhandled field (this is a bug): " << ToString(field);
}
}
@@ -575,7 +575,7 @@ string BlockInfo(Field field,
case Field::kBlockKind: return block_kind;
case Field::kColumn: if (column_id) {
- return tablet.schema().column_by_id(*column_id).name();
+ return tablet.schema()->column_by_id(*column_id).name();
} else { return ""; }
case Field::kColumnId: if (column_id) {
@@ -597,8 +597,8 @@ string FormatCFileKeyMetadata(const TabletMetadata& tablet,
Arena arena(1024);
EncodedKey* key;
- CHECK_OK(EncodedKey::DecodeEncodedString(tablet.schema(), &arena, value, &key));
- return key->Stringify(tablet.schema());
+ CHECK_OK(EncodedKey::DecodeEncodedString(*tablet.schema().get(), &arena, value, &key));
+ return key->Stringify(*tablet.schema().get());
}
// Formats the delta stats property from CFile metadata.
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 0540abb..03d39ad 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -554,6 +554,7 @@ Status SummarizeDataSize(const RunnerContext& context) {
RETURN_NOT_OK_PREPEND(TabletMetadata::Load(fs.get(), tablet_id, &meta),
Substitute("could not load tablet metadata for $0", tablet_id));
const string& table_id = meta->table_id();
+ const SchemaPtr schema_ptr = meta->schema();
for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets()) {
TabletSizeStats rowset_stats;
RETURN_NOT_OK(SummarizeSize(fs.get(), rs_meta->redo_delta_blocks(),
@@ -570,11 +571,11 @@ Status SummarizeDataSize(const RunnerContext& context) {
for (const auto& e : column_blocks_by_id) {
const auto& col_id = e.first;
const auto& block = e.second;
- const auto& col_idx = meta->schema().find_column_by_id(col_id);
+ const auto& col_idx = schema_ptr->find_column_by_id(col_id);
string col_key = Substitute(
"c$0 ($1)", col_id,
(col_idx != Schema::kColumnNotFound) ?
- meta->schema().column(col_idx).name() : "?");
+ schema_ptr->column(col_idx).name() : "?");
RETURN_NOT_OK(SummarizeSize(
fs.get(), { block }, col_key, &rowset_stats.column_bytes[col_key]));
}
@@ -661,12 +662,12 @@ Status DumpBlockIdsForLocalReplica(const RunnerContext& context) {
cout << "Listing all data blocks in tablet "
<< tablet_id << ":" << endl;
- Schema schema = meta->schema();
+ SchemaPtr schema = meta->schema();
size_t idx = 0;
for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets()) {
cout << "Rowset " << idx++ << endl;
- RETURN_NOT_OK(ListBlocksInRowSet(schema, *rs_meta));
+ RETURN_NOT_OK(ListBlocksInRowSet(*schema.get(), *rs_meta));
}
return Status::OK();
@@ -677,11 +678,11 @@ Status DumpTabletMeta(FsManager* fs_manager,
scoped_refptr<TabletMetadata> meta;
RETURN_NOT_OK(TabletMetadata::Load(fs_manager, tablet_id, &meta));
- const Schema& schema = meta->schema();
+ const Schema& schema = *meta->schema().get();
cout << Indent(indent) << "Partition: "
<< meta->partition_schema().PartitionDebugString(meta->partition(),
- meta->schema())
+ schema)
<< endl;
cout << Indent(indent) << "Table name: " << meta->table_name()
<< " Table id: " << meta->table_id() << endl;
@@ -736,7 +737,7 @@ Status DumpRowSetInternal(const IOContext& ctx,
if (FLAGS_dump_all_columns) {
RETURN_NOT_OK(rs->DebugDump(&lines));
} else {
- Schema key_proj = rs_meta->tablet_schema().CreateKeyProjection();
+ Schema key_proj = rs_meta->tablet_schema()->CreateKeyProjection();
RowIteratorOptions opts;
opts.projection = &key_proj;
opts.io_context = &ctx;
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index add5d9d..f07340f 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -404,15 +404,16 @@ ScanDescriptor Scanner::Descriptor() const {
const auto& tablet_metadata = tablet_replica_->tablet_metadata();
descriptor.table_name = tablet_metadata->table_name();
+ SchemaPtr schema_ptr = tablet_metadata->schema();
if (spec().lower_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY >= $0", KUDU_REDACT(
- spec().lower_bound_key()->Stringify(tablet_metadata->schema()))));
+ spec().lower_bound_key()->Stringify(*schema_ptr))));
}
if (spec().exclusive_upper_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY < $0", KUDU_REDACT(
- spec().exclusive_upper_bound_key()->Stringify(tablet_metadata->schema()))));
+ spec().exclusive_upper_bound_key()->Stringify(*schema_ptr))));
}
for (const auto& predicate : spec().predicates()) {
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 951ee14..ce7fb61 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -3478,7 +3478,7 @@ class InvalidScanRequest_WithIdsParamTest :
public ::testing::WithParamInterface<ReadMode> {
};
TEST_P(InvalidScanRequest_WithIdsParamTest, Test) {
- const Schema* projection = tablet_replica_->tablet()->schema();
+ const SchemaPtr projection = tablet_replica_->tablet()->schema();
ASSERT_TRUE(projection->has_column_ids());
VerifyScanRequestFailure(*projection,
TabletServerErrorPB::INVALID_SCHEMA,
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index d32e661..908e638 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1170,7 +1170,8 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
return;
}
- const auto& tablet_schema = replica->tablet_metadata()->schema();
+ const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+ const Schema& tablet_schema = *tablet_schema_ptr;
if (req_schema == tablet_schema) {
context->RespondSuccess();
return;
@@ -2178,8 +2179,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
// If the token doesn't have full scan privileges for the table, check
// for required privileges based on the scan request.
if (!privilege.scan_privilege()) {
- const auto& schema = replica->tablet_metadata()->schema();
- if (!CheckScanPrivilegesOrRespond(scan_pb, schema, authorized_column_ids,
+ const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+ if (!CheckScanPrivilegesOrRespond(scan_pb, *schema_ptr, authorized_column_ids,
"Scan", context)) {
return;
}
@@ -2254,7 +2255,8 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
}
if (req->need_schema_info()) {
- const auto& tablet_schema = replica->tablet_metadata()->schema();
+ const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+ const Schema& tablet_schema = *schema_ptr;
CHECK_OK(SchemaToPB(tablet_schema, status->mutable_schema()));
CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB(
tablet_schema, status->mutable_partition_schema()));
@@ -2300,7 +2302,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
return;
}
if (!privilege.scan_privilege()) {
- const auto& schema = replica->tablet_metadata()->schema();
+ const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+ const Schema& schema = *schema_ptr;
unordered_set<ColumnId> required_column_privileges;
if (req->has_start_primary_key() || req->has_stop_primary_key()) {
const auto& key_cols = schema.get_key_column_ids();
@@ -2345,7 +2348,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
// Decode encoded key
Arena arena(256);
- const auto& tablet_schema = replica->tablet_metadata()->schema();
+ const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+ const Schema& tablet_schema = *tablet_schema_ptr;
EncodedKey* start = nullptr;
EncodedKey* stop = nullptr;
if (req->has_start_primary_key()) {
@@ -2482,8 +2486,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
// If the token doesn't have full scan privileges for the table, check
// for required privileges based on the checksum request.
if (!privilege.scan_privilege()) {
- const auto& schema = replica->tablet_metadata()->schema();
- if (!CheckScanPrivilegesOrRespond(new_req, schema, authorized_column_ids,
+ const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+ if (!CheckScanPrivilegesOrRespond(new_req, *schema_ptr, authorized_column_ids,
"Checksum", context)) {
return;
}
@@ -2788,7 +2792,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
}
}
- const auto& tablet_schema = replica->tablet_metadata()->schema();
+ const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+ const Schema& tablet_schema = *tablet_schema_ptr;
ScanSpec spec;
s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 2137dfa..aa92940 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -35,6 +35,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator_stats.h"
#include "kudu/common/partition.h"
+#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log_anchor_registry.h"
@@ -93,8 +94,6 @@ DECLARE_int32(scan_history_count);
namespace kudu {
-class Schema;
-
namespace tserver {
namespace {
@@ -356,6 +355,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*
for (const scoped_refptr<TabletReplica>& replica : replicas) {
EasyJson replica_json = details_json.PushBack(EasyJson::kObject);
const auto& tmeta = replica->tablet_metadata();
+ const SchemaPtr schema_ptr = tmeta->schema();
TabletStatusPB status;
replica->GetTabletStatusPB(&status);
replica_json["table_name"] = status.table_name();
@@ -366,7 +366,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*
}
replica_json["partition"] =
tmeta->partition_schema().PartitionDebugString(tmeta->partition(),
- tmeta->schema());
+ *schema_ptr);
replica_json["state"] = replica->HumanReadableState();
if (status.has_estimated_on_disk_size()) {
replica_json["n_bytes"] =
@@ -425,9 +425,9 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req
output->Set("table_name", table_name);
const auto& tmeta = replica->tablet_metadata();
- const Schema& schema = tmeta->schema();
+ const SchemaPtr schema_ptr = tmeta->schema();
output->Set("partition",
- tmeta->partition_schema().PartitionDebugString(tmeta->partition(), schema));
+ tmeta->partition_schema().PartitionDebugString(tmeta->partition(), *schema_ptr));
output->Set("on_disk_size", HumanReadableNumBytes::ToString(replica->OnDiskSize()));
uint64_t live_row_count;
Status s = replica->CountLiveRows(&live_row_count);
@@ -437,7 +437,7 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req
output->Set("tablet_live_row_count", "N/A");
}
- SchemaToJson(schema, output);
+ SchemaToJson(*schema_ptr, output);
}
void TabletServerPathHandlers::HandleTabletSVGPage(const Webserver::WebRequest& req,