You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/03/04 00:03:20 UTC

[kudu] 01/02: KUDU-3197 [tserver] optimal Schema's memory used, using std::shared_ptr

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4b6d8917b79b9de53957174ade1a7ffc76e0090
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Tue Jan 4 20:07:29 2022 +0800

    KUDU-3197 [tserver] optimal Schema's memory used, using std::shared_ptr
    
    Change TabletMeta's variable Schema* to std::shared_ptr<Schema>
    to reduce memory used when alter schema.
    
    Because TabletMeta save old_schemas to reserve the elder schemas
    when alter schema, maybe they have been used by scanners or
    compaction jobs. As jira KUDU-3197 said, frequently alter schema will
    lead to tserver's memory becomes very large, just like memory leak,
    especially column's number is very large.
    
    The jira issued by wangningito, and I continue his work, and
    now use std::shared_ptr instead of scoped_refptr<Schema>, because
    scoped_refptr<Schema> causes too many changes, just as:
    https://gerrit.cloudera.org/c/18098/
    
    Change-Id: Ic284dde108c49130419d876c6698b40c195e9b35
    Reviewed-on: http://gerrit.cloudera.org:8080/18255
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client-test.cc                     | 10 ++---
 src/kudu/common/schema.h                           |  1 +
 src/kudu/integration-tests/linked_list-test-util.h |  2 +-
 src/kudu/master/sys_catalog.cc                     |  5 ++-
 src/kudu/tablet/all_types-scan-correctness-test.cc |  2 +-
 src/kudu/tablet/cfile_set.cc                       |  4 +-
 src/kudu/tablet/cfile_set.h                        |  2 +-
 src/kudu/tablet/diff_scan-test.cc                  |  6 +--
 src/kudu/tablet/diskrowset.cc                      |  8 ++--
 src/kudu/tablet/mt-tablet-test.cc                  |  2 +-
 src/kudu/tablet/ops/alter_schema_op.cc             | 10 ++---
 src/kudu/tablet/ops/alter_schema_op.h              |  8 ++--
 src/kudu/tablet/ops/op.h                           |  4 +-
 src/kudu/tablet/ops/write_op.h                     |  7 ++-
 src/kudu/tablet/rowset_metadata.h                  |  2 +-
 src/kudu/tablet/tablet-schema-test.cc              | 10 ++---
 src/kudu/tablet/tablet-test-util.h                 |  3 +-
 src/kudu/tablet/tablet.cc                          | 47 +++++++++++---------
 src/kudu/tablet/tablet.h                           |  6 +--
 src/kudu/tablet/tablet_bootstrap.cc                |  8 ++--
 src/kudu/tablet/tablet_metadata.cc                 | 51 +++++++++++-----------
 src/kudu/tablet/tablet_metadata.h                  | 29 +++++-------
 src/kudu/tablet/tablet_replica-test.cc             | 10 ++---
 src/kudu/tools/kudu-tool-test.cc                   |  8 ++--
 src/kudu/tools/tool_action_fs.cc                   |  8 ++--
 src/kudu/tools/tool_action_local_replica.cc        | 15 ++++---
 src/kudu/tserver/scanners.cc                       |  5 ++-
 src/kudu/tserver/tablet_server-test.cc             |  2 +-
 src/kudu/tserver/tablet_service.cc                 | 23 ++++++----
 src/kudu/tserver/tserver_path_handlers.cc          | 12 ++---
 30 files changed, 161 insertions(+), 149 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 94ec37d..75b2bce 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4709,7 +4709,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
         ->Default(KuduValue::CopyString("hello!"));
     ASSERT_OK(table_alterer->Alter());
     ASSERT_EQ(4, tablet_replica->tablet()->metadata()->schema_version());
-    Schema schema = tablet_replica->tablet()->metadata()->schema();
+    Schema schema = *tablet_replica->tablet()->metadata()->schema();
     ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
     ASSERT_FALSE(col_schema.has_read_default());
     ASSERT_TRUE(col_schema.has_write_default());
@@ -4723,7 +4723,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
         ->Default(KuduValue::FromInt(54321));
     ASSERT_OK(table_alterer->Alter());
     ASSERT_EQ(5, tablet_replica->tablet()->metadata()->schema_version());
-    Schema schema = tablet_replica->tablet()->metadata()->schema();
+    Schema schema = *tablet_replica->tablet()->metadata()->schema();
     ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
     ASSERT_TRUE(col_schema.has_read_default()); // Started with a default
     ASSERT_TRUE(col_schema.has_write_default());
@@ -4737,7 +4737,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
         ->RemoveDefault();
     ASSERT_OK(table_alterer->Alter());
     ASSERT_EQ(6, tablet_replica->tablet()->metadata()->schema_version());
-    Schema schema = tablet_replica->tablet()->metadata()->schema();
+    Schema schema = *tablet_replica->tablet()->metadata()->schema();
     ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
     ASSERT_FALSE(col_schema.has_read_default());
     ASSERT_FALSE(col_schema.has_write_default());
@@ -4750,7 +4750,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
         ->RemoveDefault();
     ASSERT_OK(table_alterer->Alter());
     ASSERT_EQ(7, tablet_replica->tablet()->metadata()->schema_version());
-    Schema schema = tablet_replica->tablet()->metadata()->schema();
+    Schema schema = *tablet_replica->tablet()->metadata()->schema();
     ColumnSchema col_schema = schema.column(schema.find_column("non_null_with_default"));
     ASSERT_TRUE(col_schema.has_read_default());
     ASSERT_FALSE(col_schema.has_write_default());
@@ -4779,7 +4779,7 @@ TEST_F(ClientTest, TestBasicAlterOperations) {
         ->BlockSize(16 * 1024 * 1024);
     ASSERT_OK(table_alterer->Alter());
     ASSERT_EQ(8, tablet_replica->tablet()->metadata()->schema_version());
-    Schema schema = tablet_replica->tablet()->metadata()->schema();
+    Schema schema = *tablet_replica->tablet()->metadata()->schema();
     ColumnSchema col_schema = schema.column(schema.find_column("string_val"));
     ASSERT_EQ(KuduColumnStorageAttributes::PLAIN_ENCODING, col_schema.attributes().encoding);
     ASSERT_EQ(KuduColumnStorageAttributes::LZ4, col_schema.attributes().compression);
diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h
index 725a0e8..6662319 100644
--- a/src/kudu/common/schema.h
+++ b/src/kudu/common/schema.h
@@ -48,6 +48,7 @@
 
 namespace kudu {
 class Schema;
+typedef std::shared_ptr<Schema> SchemaPtr;
 }  // namespace kudu
 
 // Check that two schemas are equal, yielding a useful error message
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 8e395df..89eae43 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -564,7 +564,7 @@ inline Status LinkedListTester::VerifyLinkedListLocal(
                               GenerateSplitInts());
   verifier.StartScanTimer();
 
-  const Schema* tablet_schema = tablet->schema();
+  const Schema* tablet_schema = tablet->schema().get();
   // Cannot use schemas with col indexes in a scan (assertions fire).
   Schema projection(tablet_schema->columns(), tablet_schema->num_key_columns());
   std::unique_ptr<RowwiseIterator> iter;
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 0d417a5..5b61cd7 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -252,9 +252,10 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
   RETURN_NOT_OK(tablet::TabletMetadata::Load(fs_manager, kSysCatalogTabletId, &metadata));
 
   // Verify that the schema is the current one
-  if (metadata->schema() != BuildTableSchema()) {
+  const SchemaPtr schema_ptr = metadata->schema();
+  if (*schema_ptr != BuildTableSchema()) {
     // TODO: In this case we probably should execute the migration step.
-    return(Status::Corruption("Unexpected schema", metadata->schema().ToString()));
+    return(Status::Corruption("Unexpected schema", schema_ptr->ToString()));
   }
 
   LOG(INFO) << "Verifying existing consensus state";
diff --git a/src/kudu/tablet/all_types-scan-correctness-test.cc b/src/kudu/tablet/all_types-scan-correctness-test.cc
index 7dfa1d6..5340e04 100644
--- a/src/kudu/tablet/all_types-scan-correctness-test.cc
+++ b/src/kudu/tablet/all_types-scan-correctness-test.cc
@@ -363,7 +363,7 @@ public:
     } else {
       default_ptr = rowops_.GenerateElement(read_default);
     }
-    SchemaBuilder builder(tablet()->metadata()->schema());
+    SchemaBuilder builder(*tablet()->metadata()->schema());
     builder.RemoveColumn("val_c");
     ASSERT_OK(builder.AddColumn("val_c", rowops_.type_, true, default_ptr, nullptr));
     AlterSchema(builder.Build());
diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc
index f9899e4..a9efaf0 100644
--- a/src/kudu/tablet/cfile_set.cc
+++ b/src/kudu/tablet/cfile_set.cc
@@ -220,7 +220,7 @@ CFileReader* CFileSet::key_index_reader() const {
   // If there is no special index cfile, then we have a non-compound key
   // and we can just use the key column.
   // This is always the first column listed in the tablet schema.
-  int key_col_id = tablet_schema().column_id(0);
+  int key_col_id = tablet_schema()->column_id(0);
   return FindOrDie(readers_by_col_id_, key_col_id).get();
 }
 
@@ -423,7 +423,7 @@ Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) {
 
   Schema key_schema_for_vlog;
   if (VLOG_IS_ON(1)) {
-    key_schema_for_vlog = base_data_->tablet_schema().CreateKeyProjection();
+    key_schema_for_vlog = base_data_->tablet_schema()->CreateKeyProjection();
   }
 
   const auto* lb_key = spec->lower_bound_key();
diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h
index 1b06df0..34eb4b3 100644
--- a/src/kudu/tablet/cfile_set.h
+++ b/src/kudu/tablet/cfile_set.h
@@ -156,7 +156,7 @@ class CFileSet :
   // (the ad-hoc reader for composite keys, otherwise the key column reader)
   cfile::CFileReader* key_index_reader() const;
 
-  const Schema &tablet_schema() const { return rowset_metadata_->tablet_schema(); }
+  const SchemaPtr tablet_schema() const { return rowset_metadata_->tablet_schema(); }
 
   std::shared_ptr<RowSetMetadata> rowset_metadata_;
   std::shared_ptr<MemTracker> bloomfile_tracker_;
diff --git a/src/kudu/tablet/diff_scan-test.cc b/src/kudu/tablet/diff_scan-test.cc
index cecc367..6dded93 100644
--- a/src/kudu/tablet/diff_scan-test.cc
+++ b/src/kudu/tablet/diff_scan-test.cc
@@ -104,7 +104,7 @@ TEST_P(DiffScanTest, TestDiffScan) {
   opts.include_deleted_rows = include_deleted_rows;
 
   static const bool kIsDeletedDefault = false;
-  SchemaBuilder builder(tablet->metadata()->schema());
+  SchemaBuilder builder(*tablet->metadata()->schema());
   if (order_mode == ORDERED) {
     // Define our diff scan to start from snap1.
     // NOTE: it isn't critical to set this given the default is -Inf, but it
@@ -188,7 +188,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestKudu3108) {
   opts.order = ORDERED;
   opts.include_deleted_rows = true;
   static const bool kIsDeletedDefault = false;
-  SchemaBuilder builder(tablet->metadata()->schema());
+  SchemaBuilder builder(*tablet->metadata()->schema());
   ASSERT_OK(builder.AddColumn("deleted", IS_DELETED,
                               /*is_nullable=*/ false,
                               /*read_default=*/ &kIsDeletedDefault,
@@ -247,7 +247,7 @@ TEST_F(OrderedDiffScanWithDeletesTest, TestDiffScanAfterDeltaFlushRacesWithBatch
   opts.snap_to_exclude = snap1;
   opts.snap_to_include = snap2;
   opts.order = ORDERED;;
-  SchemaBuilder builder(tablet->metadata()->schema());
+  SchemaBuilder builder(*tablet->metadata()->schema());
   Schema projection = builder.BuildWithoutIds();
   opts.projection = &projection;
 
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index da2ae7a..13c6d06 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -630,10 +630,10 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
   DCHECK(open_);
   shared_lock<rw_spinlock> l(component_lock_);
 
-  const Schema* schema = &rowset_metadata_->tablet_schema();
+  const SchemaPtr schema_ptr = rowset_metadata_->tablet_schema();
 
   RowIteratorOptions opts;
-  opts.projection = schema;
+  opts.projection = schema_ptr.get();
   opts.io_context = io_context;
   vector<shared_ptr<DeltaStore>> included_stores;
   unique_ptr<DeltaIterator> delta_iter;
@@ -641,7 +641,7 @@ Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
       opts, REDO, &included_stores, &delta_iter));
 
   out->reset(new MajorDeltaCompaction(rowset_metadata_->fs_manager(),
-                                      *schema,
+                                      *schema_ptr,
                                       base_data_.get(),
                                       std::move(delta_iter),
                                       std::move(included_stores),
@@ -910,7 +910,7 @@ Status DiskRowSet::DebugDump(vector<string> *lines) {
   // Using CompactionInput to dump our data is an easy way of seeing all the
   // rows and deltas.
   unique_ptr<CompactionInput> input;
-  RETURN_NOT_OK(NewCompactionInput(&rowset_metadata_->tablet_schema(),
+  RETURN_NOT_OK(NewCompactionInput(rowset_metadata_->tablet_schema().get(),
                                    MvccSnapshot::CreateSnapshotIncludingAllOps(),
                                    nullptr, &input));
   return DebugDumpCompactionInput(input.get(), lines);
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 937f4a0..05f29b3 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -107,7 +107,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
     uint64_t count;
     CHECK_OK(tablet()->CountRows(&count));
-    const Schema* schema = tablet()->schema();
+    const Schema* schema = tablet()->schema().get();
     ColumnSchema valcol = schema->column(schema->find_column("val"));
     valcol_projection_ = Schema({ valcol }, 0);
     CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
diff --git a/src/kudu/tablet/ops/alter_schema_op.cc b/src/kudu/tablet/ops/alter_schema_op.cc
index 1c73ea2..5e386d3 100644
--- a/src/kudu/tablet/ops/alter_schema_op.cc
+++ b/src/kudu/tablet/ops/alter_schema_op.cc
@@ -96,17 +96,17 @@ Status AlterSchemaOp::Prepare() {
   TRACE("PREPARE ALTER-SCHEMA: Starting");
 
   // Decode schema
-  unique_ptr<Schema> schema(new Schema);
-  Status s = SchemaFromPB(state_->request()->schema(), schema.get());
+  SchemaPtr schema_ptr = std::make_shared<Schema>();
+  Status s = SchemaFromPB(state_->request()->schema(), schema_ptr.get());
   if (!s.ok()) {
     state_->completion_callback()->set_error(s, TabletServerErrorPB::INVALID_SCHEMA);
     return s;
   }
 
   Tablet* tablet = state_->tablet_replica()->tablet();
-  RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema.get()));
+  RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema_ptr));
 
-  state_->AddToAutoReleasePool(std::move(schema));
+  state_->AddToAutoReleasePool(std::move(schema_ptr));
 
   TRACE("PREPARE ALTER-SCHEMA: finished");
   return s;
@@ -138,7 +138,7 @@ Status AlterSchemaOp::Apply(CommitMsg** commit_msg) {
   }
 
   state_->tablet_replica()->log()
-    ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
+    ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema().get()),
                                  state_->schema_version());
 
   // Altered tablets should be included in the next tserver heartbeat so that
diff --git a/src/kudu/tablet/ops/alter_schema_op.h b/src/kudu/tablet/ops/alter_schema_op.h
index 9bccd2a..022544e 100644
--- a/src/kudu/tablet/ops/alter_schema_op.h
+++ b/src/kudu/tablet/ops/alter_schema_op.h
@@ -24,6 +24,7 @@
 #include <boost/optional/optional.hpp>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/schema.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tablet/ops/op.h"
@@ -32,7 +33,6 @@
 
 namespace kudu {
 
-class Schema;
 class rw_semaphore;
 
 namespace tablet {
@@ -59,8 +59,8 @@ class AlterSchemaOpState : public OpState {
   const tserver::AlterSchemaRequestPB* request() const override { return request_; }
   tserver::AlterSchemaResponsePB* response() const override { return response_; }
 
-  void set_schema(const Schema* schema) { schema_ = schema; }
-  const Schema* schema() const { return schema_; }
+  void set_schema(const SchemaPtr& schema) { schema_ = schema; }
+  const SchemaPtr schema() const { return schema_; }
 
   std::string new_table_name() const {
     return request_->new_table_name();
@@ -109,7 +109,7 @@ class AlterSchemaOpState : public OpState {
   DISALLOW_COPY_AND_ASSIGN(AlterSchemaOpState);
 
   // The new (target) Schema.
-  const Schema* schema_;
+  SchemaPtr schema_;
 
   // The original RPC request and response.
   const tserver::AlterSchemaRequestPB *request_;
diff --git a/src/kudu/tablet/ops/op.h b/src/kudu/tablet/ops/op.h
index c94837c..024b53d 100644
--- a/src/kudu/tablet/ops/op.h
+++ b/src/kudu/tablet/ops/op.h
@@ -209,7 +209,7 @@ class OpState {
   }
 
   // Sets a heap object to be managed by this op's AutoReleasePool.
-  void AddToAutoReleasePool(std::unique_ptr<Schema> t) {
+  void AddToAutoReleasePool(SchemaPtr t) {
     schemas_pool_.emplace_back(std::move(t));
   }
 
@@ -283,7 +283,7 @@ class OpState {
   // Optional callback to be called once the op completes.
   std::unique_ptr<OpCompletionCallback> completion_clbk_;
 
-  std::deque<std::unique_ptr<Schema>> schemas_pool_;
+  std::deque<SchemaPtr> schemas_pool_;
 
   // This operation's timestamp.
   // This is only set once during the operation lifecycle, using external synchronization.
diff --git a/src/kudu/tablet/ops/write_op.h b/src/kudu/tablet/ops/write_op.h
index 8c14176..4751d39 100644
--- a/src/kudu/tablet/ops/write_op.h
+++ b/src/kudu/tablet/ops/write_op.h
@@ -193,9 +193,10 @@ class WriteOpState : public OpState {
 
   void ReleaseMvccTxn(Op::OpResult result);
 
-  void set_schema_at_decode_time(const Schema* schema) {
+  void set_schema_at_decode_time(const SchemaPtr& schema) {
     std::lock_guard<simple_spinlock> l(op_state_lock_);
-    schema_at_decode_time_ = schema;
+    schema_ptr_at_decode_time_ = schema;
+    schema_at_decode_time_ = schema.get();
   }
 
   const Schema* schema_at_decode_time() const {
@@ -319,6 +320,8 @@ class WriteOpState : public OpState {
   // at APPLY time to ensure we don't have races against schema change.
   // Protected by op_state_lock_.
   const Schema* schema_at_decode_time_;
+  // protect schema_at_decode_time_
+  SchemaPtr schema_ptr_at_decode_time_;
 
   // Lock that protects access to various fields of WriteOpState.
   mutable simple_spinlock op_state_lock_;
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index c4be77d..70a6b1f 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -99,7 +99,7 @@ class RowSetMetadata {
 
   int64_t id() const { return id_; }
 
-  const Schema& tablet_schema() const {
+  const SchemaPtr tablet_schema() const {
     return tablet_metadata_->schema();
   }
 
diff --git a/src/kudu/tablet/tablet-schema-test.cc b/src/kudu/tablet/tablet-schema-test.cc
index de26638..1886f81 100644
--- a/src/kudu/tablet/tablet-schema-test.cc
+++ b/src/kudu/tablet/tablet-schema-test.cc
@@ -146,7 +146,7 @@ TEST_F(TestTabletSchema, TestWrite) {
   const int32_t c2_write_default = 5;
   const int32_t c2_read_default = 7;
 
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default));
   AlterSchema(builder.Build());
   Schema s2 = builder.BuildWithoutIds();
@@ -189,7 +189,7 @@ TEST_F(TestTabletSchema, TestReInsert) {
   const int32_t c2_write_default = 5;
   const int32_t c2_read_default = 7;
 
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.AddColumn("c2", INT32, false, &c2_read_default, &c2_write_default));
   AlterSchema(builder.Build());
   Schema s2 = builder.BuildWithoutIds();
@@ -219,7 +219,7 @@ TEST_F(TestTabletSchema, TestRenameProjection) {
   InsertRow(client_schema_, 1);
 
   // Switch schema to s2
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.RenameColumn("c1", "c1_renamed"));
   AlterSchema(builder.Build());
   Schema s2 = builder.BuildWithoutIds();
@@ -260,7 +260,7 @@ TEST_F(TestTabletSchema, TestDeleteAndReAddColumn) {
   VerifyTabletRows(client_schema_, keys);
 
   // Switch schema to s2
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.RemoveColumn("c1"));
   // NOTE this new 'c1' will have a different id from the previous one
   //      so the data added to the previous 'c1' will not be visible.
@@ -279,7 +279,7 @@ TEST_F(TestTabletSchema, TestModifyEmptyMemRowSet) {
   std::vector<std::pair<string, string> > keys;
 
   // Switch schema to s2
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.AddNullableColumn("c2", INT32));
   AlterSchema(builder.Build());
   Schema s2 = builder.BuildWithoutIds();
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 1ca5a1a..bf010cb 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -150,8 +150,9 @@ class KuduTabletTest : public KuduTest {
       *(req.mutable_new_extra_config()) = *extra_config;
     }
 
+    SchemaPtr schema_ptr = std::make_shared<Schema>(schema);
     AlterSchemaOpState op_state(nullptr, &req, nullptr);
-    ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, &schema));
+    ASSERT_OK(tablet()->CreatePreparedAlterSchema(&op_state, schema_ptr));
     ASSERT_OK(tablet()->AlterSchema(&op_state));
     op_state.Finish();
   }
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 28bb0cf..50eda43 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -262,7 +262,7 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
                shared_ptr<MemTracker> parent_mem_tracker,
                MetricRegistry* metric_registry,
                scoped_refptr<LogAnchorRegistry> log_anchor_registry)
-  : key_schema_(metadata->schema().CreateKeyProjection()),
+  : key_schema_(metadata->schema()->CreateKeyProjection()),
     metadata_(std::move(metadata)),
     log_anchor_registry_(std::move(log_anchor_registry)),
     mem_trackers_(tablet_id(), std::move(parent_mem_tracker)),
@@ -371,7 +371,8 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
 
     // Now that the current state is loaded, create the new MemRowSet with the next id.
     shared_ptr<MemRowSet> new_mrs;
-    RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
+    const SchemaPtr schema_ptr = schema();
+    RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr,
                                     log_anchor_registry_.get(),
                                     mem_trackers_.tablet_tracker,
                                     &new_mrs));
@@ -386,7 +387,7 @@ Status Tablet::Open(const unordered_set<int64_t>& in_flight_txn_ids,
       // NOTE: we are able to FindOrDie() on these IDs because
       // 'txn_ids_with_mrs' is a subset of the transaction IDs known by the
       // metadata.
-      RETURN_NOT_OK(MemRowSet::Create(0, *schema(), txn_id, FindOrDie(txn_meta_by_id, txn_id),
+      RETURN_NOT_OK(MemRowSet::Create(0, *schema_ptr, txn_id, FindOrDie(txn_meta_by_id, txn_id),
                                       log_anchor_registry_.get(),
                                       mem_trackers_.tablet_tracker,
                                       &txn_mrs));
@@ -462,7 +463,7 @@ void Tablet::Shutdown() {
 
 Status Tablet::GetMappedReadProjection(const Schema& projection,
                                        Schema *mapped_projection) const {
-  const Schema* cur_schema = schema();
+  const SchemaPtr cur_schema = schema();
   return cur_schema->GetMappedReadProjection(projection, mapped_projection);
 }
 
@@ -541,17 +542,18 @@ Status Tablet::DecodeWriteOperations(const Schema* client_schema,
   TRACE("Decoding operations");
   vector<DecodedRowOperation> ops;
 
+  SchemaPtr schema_ptr = schema();
   // Decode the ops
   RowOperationsPBDecoder dec(&op_state->request()->row_operations(),
                              client_schema,
-                             schema(),
+                             schema_ptr.get(),
                              op_state->arena());
   RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
   TRACE_COUNTER_INCREMENT("num_ops", ops.size());
 
   // Important to set the schema before the ops -- we need the
   // schema in order to stringify the ops.
-  op_state->set_schema_at_decode_time(schema());
+  op_state->set_schema_at_decode_time(schema_ptr);
   op_state->SetRowOps(std::move(ops));
 
   return Status::OK();
@@ -605,8 +607,8 @@ Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
   const auto& ps = metadata_->partition_schema();
   if (PREDICT_FALSE(!ps.PartitionContainsRow(metadata_->partition(), row))) {
     return Status::NotFound(
-        Substitute("Row not in tablet partition. Partition: '$0' row: '$1'",
-                   ps.PartitionDebugString(metadata_->partition(), *schema()),
+        Substitute("Row not in tablet partition. Partition: '$0', row: '$1'.",
+                   ps.PartitionDebugString(metadata_->partition(), *schema().get()),
                    ps.PartitionKeyDebugString(row)));
   }
   return Status::OK();
@@ -744,7 +746,7 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context,
   }
 
   Timestamp ts = op_state->timestamp();
-  ConstContiguousRow row(schema(), op->decoded_op.row_data);
+  ConstContiguousRow row(schema().get(), op->decoded_op.row_data);
 
   // TODO(todd): the Insert() call below will re-encode the key, which is a
   // waste. Should pass through the KeyProbe structure perhaps.
@@ -819,7 +821,7 @@ Status Tablet::ApplyUpsertAsUpdate(const IOContext* io_context,
                                    RowOp* upsert,
                                    RowSet* rowset,
                                    ProbeStats* stats) {
-  const auto* schema = this->schema();
+  const auto* schema = this->schema().get();
   ConstContiguousRow row(schema, upsert->decoded_op.row_data);
   faststring buf;
   RowChangeListEncoder enc(&buf);
@@ -991,7 +993,8 @@ void Tablet::StartApplying(ParticipantOpState* op_state) {
 
 void Tablet::CreateTxnRowSets(int64_t txn_id, scoped_refptr<TxnMetadata> txn_meta) {
   shared_ptr<MemRowSet> new_mrs;
-  CHECK_OK(MemRowSet::Create(0, *schema(), txn_id, std::move(txn_meta),
+  const SchemaPtr schema_ptr = schema();
+  CHECK_OK(MemRowSet::Create(0, *schema_ptr, txn_id, std::move(txn_meta),
                              log_anchor_registry_.get(),
                              mem_trackers_.tablet_tracker,
                              &new_mrs));
@@ -1266,7 +1269,7 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context,
   }
   DCHECK(op_state != nullptr) << "must have a WriteOpState";
   DCHECK(op_state->op_id().IsInitialized()) << "OpState OpId needed for anchoring";
-  DCHECK_EQ(op_state->schema_at_decode_time(), schema());
+  DCHECK_EQ(op_state->schema_at_decode_time(), schema().get());
 
   // If we were unable to check rowset presence in batch (e.g. because we are processing
   // a batch which contains some duplicate keys) we need to do so now.
@@ -1524,7 +1527,8 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
   }
 
   shared_ptr<MemRowSet> new_mrs;
-  RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema(),
+  const SchemaPtr schema_ptr = schema();
+  RETURN_NOT_OK(MemRowSet::Create(next_mrs_id_++, *schema_ptr,
                                   log_anchor_registry_.get(),
                                   mem_trackers_.tablet_tracker,
                                   &new_mrs));
@@ -1539,7 +1543,7 @@ Status Tablet::ReplaceMemRowSetsUnlocked(RowSetsInCompaction* compaction,
 }
 
 Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state,
-                                         const Schema* schema) {
+                                         const SchemaPtr& schema) {
 
   if (!schema->has_column_ids()) {
     // this probably means that the request is not from the Master
@@ -1556,7 +1560,7 @@ Status Tablet::CreatePreparedAlterSchema(AlterSchemaOpState* op_state,
 }
 
 Status Tablet::AlterSchema(AlterSchemaOpState* op_state) {
-  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema())))
+  DCHECK(key_schema_.KeyTypeEquals(*DCHECK_NOTNULL(op_state->schema().get())))
       << "Schema keys cannot be altered(except name)";
 
   // Prevent any concurrent flushes. Otherwise, we run into issues where
@@ -1580,7 +1584,7 @@ Status Tablet::AlterSchema(AlterSchemaOpState* op_state) {
                         << " to " << op_state->schema()->ToString()
                         << " version " << op_state->schema_version();
   DCHECK(schema_lock_.is_locked());
-  metadata_->SetSchema(*op_state->schema(), op_state->schema_version());
+  metadata_->SetSchema(op_state->schema(), op_state->schema_version());
   if (op_state->has_new_table_name()) {
     metadata_->SetTableName(op_state->new_table_name());
     if (metric_entity_) {
@@ -1609,7 +1613,8 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
   // to flush.
   VLOG_WITH_PREFIX(1) << "Rewinding schema during bootstrap to " << new_schema.ToString();
 
-  metadata_->SetSchema(new_schema, schema_version);
+  SchemaPtr schema = std::make_shared<Schema>(new_schema);
+  metadata_->SetSchema(schema, schema_version);
   {
     std::lock_guard<rw_spinlock> lock(component_lock_);
 
@@ -1617,7 +1622,7 @@ Status Tablet::RewindSchemaForBootstrap(const Schema& new_schema,
     shared_ptr<RowSetTree> old_rowsets = components_->rowsets;
     CHECK(old_mrs->empty());
     shared_ptr<MemRowSet> new_mrs;
-    RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), new_schema,
+    RETURN_NOT_OK(MemRowSet::Create(old_mrs->mrs_id(), *schema,
                                     log_anchor_registry_.get(),
                                     mem_trackers_.tablet_tracker,
                                     &new_mrs));
@@ -1872,7 +1877,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   }
 
   shared_ptr<CompactionInput> merge;
-  RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema(), &io_context, &merge));
+  const SchemaPtr schema_ptr = schema();
+  RETURN_NOT_OK(input.CreateCompactionInput(flush_snap, schema_ptr.get(), &io_context, &merge));
 
   RollingDiskRowSetWriter drsw(metadata_.get(), merge->schema(), DefaultBloomSizing(),
                                compaction_policy_->target_rowset_size());
@@ -2014,8 +2020,9 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates "
                                     "which arrived during Phase 1. Snapshot: $1",
                                     op_name, non_duplicated_ops_snap.ToString());
+  const SchemaPtr schema_ptr2 = schema();
   RETURN_NOT_OK_PREPEND(
-      input.CreateCompactionInput(non_duplicated_ops_snap, schema(), &io_context, &merge),
+      input.CreateCompactionInput(non_duplicated_ops_snap, schema_ptr2.get(), &io_context, &merge),
           Substitute("Failed to create $0 inputs", op_name).c_str());
 
   // Update the output rowsets with the deltas that came in in phase 1, before we swapped
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 4d72cb9..6298c6c 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -261,7 +261,7 @@ class Tablet {
   // An error will be returned if the specified schema is invalid (e.g.
   // key mismatch, or missing IDs)
   Status CreatePreparedAlterSchema(AlterSchemaOpState *op_state,
-                                   const Schema* schema);
+                                   const SchemaPtr& schema);
 
   // Apply the Schema of the specified op.
   // This operation will trigger a flush on the current MemRowSet.
@@ -411,8 +411,8 @@ class Tablet {
   // has a very small number of rows.
   Status DebugDump(std::vector<std::string> *lines = NULL);
 
-  const Schema* schema() const {
-    return &metadata_->schema();
+  const SchemaPtr schema() const {
+    return metadata_->schema();
   }
 
   // Returns a reference to the key projection of the tablet schema.
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 7ccedfe..d0922a8 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1535,11 +1535,11 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
   AlterSchemaRequestPB* alter_schema = replicate_msg->mutable_alter_schema_request();
 
   // Decode schema
-  Schema schema;
-  RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), &schema));
+  SchemaPtr schema_ptr = std::make_shared<Schema>();
+  RETURN_NOT_OK(SchemaFromPB(alter_schema->schema(), schema_ptr.get()));
 
   AlterSchemaOpState op_state(nullptr, alter_schema, nullptr);
-  RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, &schema));
+  RETURN_NOT_OK(tablet_->CreatePreparedAlterSchema(&op_state, schema_ptr));
 
   // Apply the alter schema to the tablet
   RETURN_NOT_OK_PREPEND(tablet_->AlterSchema(&op_state), "Failed to AlterSchema:");
@@ -1547,7 +1547,7 @@ Status TabletBootstrap::PlayAlterSchemaRequest(const IOContext* /*io_context*/,
   if (!op_state.error()) {
     // If the alter completed successfully, update the log segment header. Note
     // that our new log isn't hooked up to the tablet yet.
-    log_->SetSchemaForNextLogSegment(std::move(schema), op_state.schema_version());
+    log_->SetSchemaForNextLogSegment(*schema_ptr, op_state.schema_version());
   }
 
   return AppendCommitMsg(commit_msg);
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 0a36f1c..1c949cb 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -167,9 +167,10 @@ Status TabletMetadata::LoadOrCreate(FsManager* fs_manager,
                                     scoped_refptr<TabletMetadata>* metadata) {
   Status s = Load(fs_manager, tablet_id, metadata);
   if (s.ok()) {
-    if ((*metadata)->schema() != schema) {
+    const SchemaPtr schema_ptr = (*metadata)->schema();
+    if (*schema_ptr != schema) {
       return Status::Corruption(Substitute("Schema on disk ($0) does not "
-        "match expected schema ($1)", (*metadata)->schema().ToString(),
+        "match expected schema ($1)", schema_ptr->ToString(),
         schema.ToString()));
     }
     return Status::OK();
@@ -312,8 +313,7 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
       log_prefix_(Substitute("T $0 P $1: ", tablet_id_, fs_manager_->uuid())),
       next_rowset_idx_(0),
       last_durable_mrs_id_(kNoDurableMemStore),
-      schema_(new Schema(schema)),
-      schema_ptr_(schema_.get()),
+      schema_(std::make_shared<Schema>(schema)),
       schema_version_(0),
       table_name_(std::move(table_name)),
       partition_schema_(std::move(partition_schema)),
@@ -339,7 +339,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
       tablet_id_(std::move(tablet_id)),
       fs_manager_(fs_manager),
       next_rowset_idx_(0),
-      schema_ptr_(nullptr),
       num_flush_pins_(0),
       needs_flush_(false),
       flush_count_for_tests_(0),
@@ -390,11 +389,14 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
     table_name_ = superblock.table_name();
 
     uint32_t schema_version = superblock.schema_version();
-    unique_ptr<Schema> schema(new Schema());
+    SchemaPtr schema = std::make_shared<Schema>();
     RETURN_NOT_OK_PREPEND(SchemaFromPB(superblock.schema(), schema.get()),
                           "Failed to parse Schema from superblock " +
                           SecureShortDebugString(superblock));
-    SetSchemaUnlocked(std::move(schema), schema_version);
+    {
+      SchemaPtr old_schema;
+      SwapSchemaUnlocked(schema, schema_version, &old_schema);
+    }
 
     if (!superblock.has_partition()) {
       // KUDU-818: Possible backward compatibility issue with tables created
@@ -418,7 +420,7 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
       CHECK_EQ(table_id_, superblock.table_id());
       PartitionSchema partition_schema;
       RETURN_NOT_OK(PartitionSchema::FromPB(superblock.partition_schema(),
-                                            *schema_ptr_, &partition_schema));
+                                            *schema_, &partition_schema));
       CHECK(partition_schema_ == partition_schema);
 
       Partition partition;
@@ -743,7 +745,7 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
   partition_.ToPB(pb.mutable_partition());
   pb.set_last_durable_mrs_id(last_durable_mrs_id_);
   pb.set_schema_version(schema_version_);
-  RETURN_NOT_OK(partition_schema_.ToPB(*schema_ptr_, pb.mutable_partition_schema()));
+  RETURN_NOT_OK(partition_schema_.ToPB(*schema_, pb.mutable_partition_schema()));
   pb.set_table_name(table_name_);
 
   for (const shared_ptr<RowSetMetadata>& meta : rowsets) {
@@ -756,8 +758,8 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
     InsertOrDie(pb.mutable_txn_metadata(), txn_id_and_metadata.first, meta_pb);
   }
 
-  DCHECK(schema_ptr_->has_column_ids());
-  RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_ptr_, pb.mutable_schema()),
+  DCHECK(schema_->has_column_ids());
+  RETURN_NOT_OK_PREPEND(SchemaToPB(*schema_, pb.mutable_schema()),
                         "Couldn't serialize schema into superblock");
 
   pb.set_tablet_data_state(tablet_data_state_);
@@ -937,22 +939,21 @@ RowSetMetadata *TabletMetadata::GetRowSetForTests(int64_t id) {
   return nullptr;
 }
 
-void TabletMetadata::SetSchema(const Schema& schema, uint32_t version) {
-  unique_ptr<Schema> new_schema(new Schema(schema));
-  std::lock_guard<LockType> l(data_lock_);
-  SetSchemaUnlocked(std::move(new_schema), version);
+void TabletMetadata::SetSchema(const SchemaPtr& schema, uint32_t version) {
+  // In case this is the last reference to the schema, destruct the pointer
+  // outside the lock.
+  SchemaPtr old_schema;
+  {
+    std::lock_guard<LockType> l(data_lock_);
+    SwapSchemaUnlocked(schema, version, &old_schema);
+  }
 }
 
-void TabletMetadata::SetSchemaUnlocked(
-    unique_ptr<Schema> new_schema, uint32_t version) {
-  DCHECK(new_schema->has_column_ids());
-
-  // "Release" barrier ensures that, when we publish the new Schema object,
-  // all of its initialization is also visible.
-  base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&schema_ptr_),
-                              reinterpret_cast<AtomicWord>(new_schema.get()));
-  std::swap(schema_, new_schema);
-  old_schemas_.emplace_back(std::move(new_schema));
+void TabletMetadata::SwapSchemaUnlocked(SchemaPtr schema, uint32_t version,
+                                        SchemaPtr* old_schema) {
+  DCHECK(schema->has_column_ids());
+  *old_schema = std::move(schema_);
+  schema_ = std::move(schema);
   schema_version_ = version;
 }
 
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index f545dfa..cda5c85 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -29,6 +30,7 @@
 
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/schema.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
@@ -43,7 +45,6 @@ namespace kudu {
 
 class BlockIdPB;
 class FsManager;
-class Schema;
 class Timestamp;
 
 namespace consensus {
@@ -58,6 +59,7 @@ namespace tablet {
 
 class RowSetMetadata;
 class TxnMetadata;
+
 enum TxnState : int8_t;
 
 typedef std::vector<std::shared_ptr<RowSetMetadata>> RowSetMetadataVector;
@@ -150,19 +152,18 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
 
   uint32_t schema_version() const;
 
-  void SetSchema(const Schema& schema, uint32_t version);
+  void SetSchema(const SchemaPtr& schema, uint32_t version);
 
   void SetTableName(const std::string& table_name);
 
   void SetExtraConfig(TableExtraConfigPB extra_config);
 
-  // Return a reference to the current schema.
+  // Return a scoped_refptr to the current schema.
   // This pointer will be valid until the TabletMetadata is destructed,
   // even if the schema is changed.
-  const Schema& schema() const {
-    const Schema* s = reinterpret_cast<const Schema*>(
-        base::subtle::Acquire_Load(reinterpret_cast<const AtomicWord*>(&schema_ptr_)));
-    return *s;
+  const SchemaPtr schema() const {
+    std::lock_guard<LockType> l(data_lock_);
+    return schema_;
   }
 
   // Returns the partition schema of the tablet's table.
@@ -375,7 +376,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   // Constructor for loading an existing tablet.
   TabletMetadata(FsManager* fs_manager, std::string tablet_id);
 
-  void SetSchemaUnlocked(std::unique_ptr<Schema> schema, uint32_t version);
+  void SwapSchemaUnlocked(SchemaPtr schema, uint32_t version, SchemaPtr* old_schema);
 
   Status LoadFromDisk();
 
@@ -443,21 +444,11 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   std::unordered_map<int64_t, scoped_refptr<TxnMetadata>> txn_metadata_by_txn_id_;;
 
   // The current schema version.
-  std::unique_ptr<Schema> schema_;
-  // The raw pointer to the schema for an atomic swap.
-  Schema* schema_ptr_;
-
+  SchemaPtr schema_;
   uint32_t schema_version_;
   std::string table_name_;
   PartitionSchema partition_schema_;
 
-  // Previous values of 'schema_'.
-  // These are currently kept alive forever, under the assumption that
-  // a given tablet won't have thousands of "alter table" calls.
-  // They are kept alive so that callers of schema() don't need to
-  // worry about reference counting or locking.
-  std::vector<std::unique_ptr<Schema>> old_schemas_;
-
   // Protected by 'data_lock_'.
   BlockIdSet orphaned_blocks_;
 
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index db077bd..e6980a9 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -32,9 +32,9 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/row_operations.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -540,11 +540,11 @@ TEST_F(TabletReplicaTest, TestRollLogSegmentSchemaOnAlter) {
   ConsensusBootstrapInfo info;
   ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
   SchemaPB orig_schema_pb;
-  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
   const int orig_schema_version = tablet()->metadata()->schema_version();
 
   // Add a new column.
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.AddColumn("new_col", INT32));
   Schema new_client_schema = builder.BuildWithoutIds();
   SchemaPB new_schema;
@@ -581,11 +581,11 @@ TEST_F(TabletReplicaTest, Kudu2690Test) {
   ConsensusBootstrapInfo info;
   ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
   SchemaPB orig_schema_pb;
-  ASSERT_OK(SchemaToPB(SchemaBuilder(tablet()->metadata()->schema()).Build(), &orig_schema_pb));
+  ASSERT_OK(SchemaToPB(SchemaBuilder(*tablet()->metadata()->schema()).Build(), &orig_schema_pb));
   const int orig_schema_version = tablet()->metadata()->schema_version();
 
   // First things first, add a new column.
-  SchemaBuilder builder(tablet()->metadata()->schema());
+  SchemaBuilder builder(*tablet()->metadata()->schema());
   ASSERT_OK(builder.AddColumn("new_col", INT32));
   Schema new_client_schema = builder.BuildWithoutIds();
   SchemaPB new_schema;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 2383b2f..1b73a3b 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2431,7 +2431,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
   // Verify the contents of the metadata output
   SCOPED_TRACE(stdout);
   string debug_str = meta->partition_schema()
-      .PartitionDebugString(meta->partition(), meta->schema());
+      .PartitionDebugString(meta->partition(), *meta->schema());
   StripWhiteSpace(&debug_str);
   ASSERT_STR_CONTAINS(stdout, debug_str);
   debug_str = Substitute("Table name: $0 Table id: $1",
@@ -2439,7 +2439,7 @@ TEST_F(ToolTest, TestLocalReplicaDumpMeta) {
   ASSERT_STR_CONTAINS(stdout, debug_str);
   debug_str = Substitute("Schema (version=$0):", meta->schema_version());
   ASSERT_STR_CONTAINS(stdout, debug_str);
-  debug_str = meta->schema().ToString();
+  debug_str = meta->schema()->ToString();
   StripWhiteSpace(&debug_str);
   ASSERT_STR_CONTAINS(stdout, debug_str);
 
@@ -2583,7 +2583,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
 
     SCOPED_TRACE(stdout);
     debug_str = meta->partition_schema()
-        .PartitionDebugString(meta->partition(), meta->schema());
+        .PartitionDebugString(meta->partition(), *meta->schema());
     StripWhiteSpace(&debug_str);
     ASSERT_STR_CONTAINS(stdout, debug_str);
     debug_str = Substitute("Table name: $0 Table id: $1",
@@ -2592,7 +2592,7 @@ TEST_F(ToolTest, TestLocalReplicaOps) {
     debug_str = Substitute("Schema (version=$0):", meta->schema_version());
     StripWhiteSpace(&debug_str);
     ASSERT_STR_CONTAINS(stdout, debug_str);
-    debug_str = meta->schema().ToString();
+    debug_str = meta->schema()->ToString();
     StripWhiteSpace(&debug_str);
     ASSERT_STR_CONTAINS(stdout, debug_str);
 
diff --git a/src/kudu/tools/tool_action_fs.cc b/src/kudu/tools/tool_action_fs.cc
index 12077da..ea530f0 100644
--- a/src/kudu/tools/tool_action_fs.cc
+++ b/src/kudu/tools/tool_action_fs.cc
@@ -549,7 +549,7 @@ string TabletInfo(Field field, const TabletMetadata& tablet) {
     case Field::kTabletId: return tablet.tablet_id();
     case Field::kPartition: return tablet.partition_schema()
                                          .PartitionDebugString(tablet.partition(),
-                                                               tablet.schema());
+                                                               *tablet.schema().get());
     default: LOG(FATAL) << "unhandled field (this is a bug): " << ToString(field);
   }
 }
@@ -575,7 +575,7 @@ string BlockInfo(Field field,
     case Field::kBlockKind: return block_kind;
 
     case Field::kColumn: if (column_id) {
-      return tablet.schema().column_by_id(*column_id).name();
+      return tablet.schema()->column_by_id(*column_id).name();
     } else { return ""; }
 
     case Field::kColumnId: if (column_id) {
@@ -597,8 +597,8 @@ string FormatCFileKeyMetadata(const TabletMetadata& tablet,
 
   Arena arena(1024);
   EncodedKey* key;
-  CHECK_OK(EncodedKey::DecodeEncodedString(tablet.schema(), &arena, value, &key));
-  return key->Stringify(tablet.schema());
+  CHECK_OK(EncodedKey::DecodeEncodedString(*tablet.schema().get(), &arena, value, &key));
+  return key->Stringify(*tablet.schema().get());
 }
 
 // Formats the delta stats property from CFile metadata.
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 0540abb..03d39ad 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -554,6 +554,7 @@ Status SummarizeDataSize(const RunnerContext& context) {
     RETURN_NOT_OK_PREPEND(TabletMetadata::Load(fs.get(), tablet_id, &meta),
                           Substitute("could not load tablet metadata for $0", tablet_id));
     const string& table_id = meta->table_id();
+    const SchemaPtr schema_ptr = meta->schema();
     for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets()) {
       TabletSizeStats rowset_stats;
       RETURN_NOT_OK(SummarizeSize(fs.get(), rs_meta->redo_delta_blocks(),
@@ -570,11 +571,11 @@ Status SummarizeDataSize(const RunnerContext& context) {
       for (const auto& e : column_blocks_by_id) {
         const auto& col_id = e.first;
         const auto& block = e.second;
-        const auto& col_idx = meta->schema().find_column_by_id(col_id);
+        const auto& col_idx = schema_ptr->find_column_by_id(col_id);
         string col_key = Substitute(
             "c$0 ($1)", col_id,
             (col_idx != Schema::kColumnNotFound) ?
-                meta->schema().column(col_idx).name() : "?");
+                schema_ptr->column(col_idx).name() : "?");
         RETURN_NOT_OK(SummarizeSize(
             fs.get(), { block }, col_key, &rowset_stats.column_bytes[col_key]));
       }
@@ -661,12 +662,12 @@ Status DumpBlockIdsForLocalReplica(const RunnerContext& context) {
   cout << "Listing all data blocks in tablet "
        << tablet_id << ":" << endl;
 
-  Schema schema = meta->schema();
+  SchemaPtr schema = meta->schema();
 
   size_t idx = 0;
   for (const shared_ptr<RowSetMetadata>& rs_meta : meta->rowsets())  {
     cout << "Rowset " << idx++ << endl;
-    RETURN_NOT_OK(ListBlocksInRowSet(schema, *rs_meta));
+    RETURN_NOT_OK(ListBlocksInRowSet(*schema.get(), *rs_meta));
   }
 
   return Status::OK();
@@ -677,11 +678,11 @@ Status DumpTabletMeta(FsManager* fs_manager,
   scoped_refptr<TabletMetadata> meta;
   RETURN_NOT_OK(TabletMetadata::Load(fs_manager, tablet_id, &meta));
 
-  const Schema& schema = meta->schema();
+  const Schema& schema = *meta->schema().get();
 
   cout << Indent(indent) << "Partition: "
        << meta->partition_schema().PartitionDebugString(meta->partition(),
-                                                        meta->schema())
+                                                        schema)
        << endl;
   cout << Indent(indent) << "Table name: " << meta->table_name()
        << " Table id: " << meta->table_id() << endl;
@@ -736,7 +737,7 @@ Status DumpRowSetInternal(const IOContext& ctx,
   if (FLAGS_dump_all_columns) {
     RETURN_NOT_OK(rs->DebugDump(&lines));
   } else {
-    Schema key_proj = rs_meta->tablet_schema().CreateKeyProjection();
+    Schema key_proj = rs_meta->tablet_schema()->CreateKeyProjection();
     RowIteratorOptions opts;
     opts.projection = &key_proj;
     opts.io_context = &ctx;
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index add5d9d..f07340f 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -404,15 +404,16 @@ ScanDescriptor Scanner::Descriptor() const {
 
   const auto& tablet_metadata = tablet_replica_->tablet_metadata();
   descriptor.table_name = tablet_metadata->table_name();
+  SchemaPtr schema_ptr = tablet_metadata->schema();
   if (spec().lower_bound_key()) {
     descriptor.predicates.emplace_back(
         Substitute("PRIMARY KEY >= $0", KUDU_REDACT(
-            spec().lower_bound_key()->Stringify(tablet_metadata->schema()))));
+            spec().lower_bound_key()->Stringify(*schema_ptr))));
   }
   if (spec().exclusive_upper_bound_key()) {
     descriptor.predicates.emplace_back(
         Substitute("PRIMARY KEY < $0", KUDU_REDACT(
-            spec().exclusive_upper_bound_key()->Stringify(tablet_metadata->schema()))));
+            spec().exclusive_upper_bound_key()->Stringify(*schema_ptr))));
   }
 
   for (const auto& predicate : spec().predicates()) {
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 951ee14..ce7fb61 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -3478,7 +3478,7 @@ class InvalidScanRequest_WithIdsParamTest :
     public ::testing::WithParamInterface<ReadMode> {
 };
 TEST_P(InvalidScanRequest_WithIdsParamTest, Test) {
-  const Schema* projection = tablet_replica_->tablet()->schema();
+  const SchemaPtr projection = tablet_replica_->tablet()->schema();
   ASSERT_TRUE(projection->has_column_ids());
   VerifyScanRequestFailure(*projection,
                            TabletServerErrorPB::INVALID_SCHEMA,
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index d32e661..908e638 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -1170,7 +1170,8 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
       return;
     }
 
-    const auto& tablet_schema = replica->tablet_metadata()->schema();
+    const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+    const Schema& tablet_schema = *tablet_schema_ptr;
     if (req_schema == tablet_schema) {
       context->RespondSuccess();
       return;
@@ -2178,8 +2179,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
     // If the token doesn't have full scan privileges for the table, check
     // for required privileges based on the scan request.
     if (!privilege.scan_privilege()) {
-      const auto& schema = replica->tablet_metadata()->schema();
-      if (!CheckScanPrivilegesOrRespond(scan_pb, schema, authorized_column_ids,
+      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+      if (!CheckScanPrivilegesOrRespond(scan_pb, *schema_ptr, authorized_column_ids,
                                         "Scan", context)) {
         return;
       }
@@ -2254,7 +2255,8 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req,
     }
 
     if (req->need_schema_info()) {
-      const auto& tablet_schema = replica->tablet_metadata()->schema();
+      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+      const Schema& tablet_schema = *schema_ptr;
       CHECK_OK(SchemaToPB(tablet_schema, status->mutable_schema()));
       CHECK_OK(replica->tablet_metadata()->partition_schema().ToPB(
           tablet_schema, status->mutable_partition_schema()));
@@ -2300,7 +2302,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
       return;
     }
     if (!privilege.scan_privilege()) {
-      const auto& schema = replica->tablet_metadata()->schema();
+      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+      const Schema& schema = *schema_ptr;
       unordered_set<ColumnId> required_column_privileges;
       if (req->has_start_primary_key() || req->has_stop_primary_key()) {
         const auto& key_cols = schema.get_key_column_ids();
@@ -2345,7 +2348,8 @@ void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req,
 
   // Decode encoded key
   Arena arena(256);
-  const auto& tablet_schema = replica->tablet_metadata()->schema();
+  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+  const Schema& tablet_schema = *tablet_schema_ptr;
   EncodedKey* start = nullptr;
   EncodedKey* stop = nullptr;
   if (req->has_start_primary_key()) {
@@ -2482,8 +2486,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
     // If the token doesn't have full scan privileges for the table, check
     // for required privileges based on the checksum request.
     if (!privilege.scan_privilege()) {
-      const auto& schema = replica->tablet_metadata()->schema();
-      if (!CheckScanPrivilegesOrRespond(new_req, schema, authorized_column_ids,
+      const SchemaPtr schema_ptr = replica->tablet_metadata()->schema();
+      if (!CheckScanPrivilegesOrRespond(new_req, *schema_ptr, authorized_column_ids,
                                         "Checksum", context)) {
         return;
       }
@@ -2788,7 +2792,8 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     }
   }
 
-  const auto& tablet_schema = replica->tablet_metadata()->schema();
+  const SchemaPtr tablet_schema_ptr = replica->tablet_metadata()->schema();
+  const Schema& tablet_schema = *tablet_schema_ptr;
 
   ScanSpec spec;
   s = SetupScanSpec(scan_pb, tablet_schema, scanner, &spec);
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 2137dfa..aa92940 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log_anchor_registry.h"
@@ -93,8 +94,6 @@ DECLARE_int32(scan_history_count);
 
 namespace kudu {
 
-class Schema;
-
 namespace tserver {
 
 namespace {
@@ -356,6 +355,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*
     for (const scoped_refptr<TabletReplica>& replica : replicas) {
       EasyJson replica_json = details_json.PushBack(EasyJson::kObject);
       const auto& tmeta = replica->tablet_metadata();
+      const SchemaPtr schema_ptr = tmeta->schema();
       TabletStatusPB status;
       replica->GetTabletStatusPB(&status);
       replica_json["table_name"] = status.table_name();
@@ -366,7 +366,7 @@ void TabletServerPathHandlers::HandleTabletsPage(const Webserver::WebRequest& /*
       }
       replica_json["partition"] =
           tmeta->partition_schema().PartitionDebugString(tmeta->partition(),
-                                                         tmeta->schema());
+                                                         *schema_ptr);
       replica_json["state"] = replica->HumanReadableState();
       if (status.has_estimated_on_disk_size()) {
         replica_json["n_bytes"] =
@@ -425,9 +425,9 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req
   output->Set("table_name", table_name);
 
   const auto& tmeta = replica->tablet_metadata();
-  const Schema& schema = tmeta->schema();
+  const SchemaPtr schema_ptr = tmeta->schema();
   output->Set("partition",
-              tmeta->partition_schema().PartitionDebugString(tmeta->partition(), schema));
+              tmeta->partition_schema().PartitionDebugString(tmeta->partition(), *schema_ptr));
   output->Set("on_disk_size", HumanReadableNumBytes::ToString(replica->OnDiskSize()));
   uint64_t live_row_count;
   Status s = replica->CountLiveRows(&live_row_count);
@@ -437,7 +437,7 @@ void TabletServerPathHandlers::HandleTabletPage(const Webserver::WebRequest& req
     output->Set("tablet_live_row_count", "N/A");
   }
 
-  SchemaToJson(schema, output);
+  SchemaToJson(*schema_ptr, output);
 }
 
 void TabletServerPathHandlers::HandleTabletSVGPage(const Webserver::WebRequest& req,