You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/08/25 18:02:40 UTC

[kudu] branch master updated: KUDU-3304: [Alter] Support to alter table's replication factor

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b499eb  KUDU-3304: [Alter] Support to alter table's replication factor
1b499eb is described below

commit 1b499ebe9978e7d77615255862f0cf8ca1054c69
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Mon Jul 12 17:57:25 2021 +0800

    KUDU-3304: [Alter] Support to alter table's replication factor
    
    In some cases we want to increase a table's replciation factor. For
    example, convert a test table to production table, fix a table's RF
    which is created by a buggy program.
    
    This patch adds a function to alter table's replication factor, supported
    in CLI tool, and also add some unit tests.
    The command looks like:
    `kudu table set_replication_factor <master_addresses> <table_name> <replication_factor>`
    
    It should be pointed out that the CLI tool will return immediately
    without waiting for the RF change to be effective, new replicas are
    being up and running when increase RF, duplicate replicas are being
    shutdown when decrease RF asynchronous. You can use `kudu cluster ksck`
    to check whether it has been effective.
    
    Change-Id: I3aa2d5b12c508ba761fa9410ad1a465cf31fb7d7
    Reviewed-on: http://gerrit.cloudera.org:8080/17674
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/client/client-test.cc                   |  43 ++-
 src/kudu/client/client.h                         |   7 +
 src/kudu/client/table_alterer-internal.cc        |   4 +
 src/kudu/client/table_alterer-internal.h         |   1 +
 src/kudu/integration-tests/alter_table-test.cc   | 327 ++++++++++++++++++++++-
 src/kudu/integration-tests/cluster_itest_util.cc |  18 +-
 src/kudu/integration-tests/cluster_itest_util.h  |   2 +-
 src/kudu/integration-tests/test_workload.cc      |   2 +
 src/kudu/master/catalog_manager.cc               | 202 ++++++++------
 src/kudu/master/catalog_manager.h                |  10 +
 src/kudu/master/master.proto                     |   1 +
 src/kudu/master/master_service.cc                |   2 +-
 src/kudu/tools/kudu-tool-test.cc                 |  82 ++++++
 src/kudu/tools/tool_action_table.cc              |  39 +++
 src/kudu/tserver/heartbeater.cc                  |   1 +
 15 files changed, 641 insertions(+), 100 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1313516..91d6063 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -62,6 +62,7 @@
 #include "kudu/client/schema.h"
 #include "kudu/client/session-internal.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/table_alterer-internal.h"
 #include "kudu/client/transaction-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
@@ -8434,14 +8435,14 @@ TEST_F(ClientTestUnixSocket, TestConnectViaUnixSocket) {
   ASSERT_EQ(1, total_unix_conns);
 }
 
-class WriteRestartTest : public ClientTest {
+class MultiTServerClientTest : public ClientTest {
  public:
   void SetUp() override {
     KuduTest::SetUp();
 
     // Start minicluster and wait for tablet servers to connect to master.
     InternalMiniClusterOptions options;
-    options.num_tablet_servers = 3;
+    options.num_tablet_servers = 4;
     cluster_.reset(new InternalMiniCluster(env_, std::move(options)));
     ASSERT_OK(cluster_->StartSync());
 
@@ -8469,7 +8470,7 @@ class WriteRestartTest : public ClientTest {
 // no errors: client should retry any operations failed due to tablet server
 // restarting. The result row count should match the number of total rows
 // written by the client.
-TEST_F(WriteRestartTest, WriteWhileRestartingMultipleTabletServers) {
+TEST_F(MultiTServerClientTest, WriteWhileRestartingMultipleTabletServers) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   constexpr const auto read_mode_to_string =
@@ -8507,6 +8508,42 @@ TEST_F(WriteRestartTest, WriteWhileRestartingMultipleTabletServers) {
   }
 }
 
+// Test changing replication factor.
+TEST_F(MultiTServerClientTest, TestSetReplicationFactor) {
+  string tablet_id = GetFirstTabletId(client_table_.get());
+
+  scoped_refptr<internal::RemoteTablet> rt;
+  client_->data_->meta_cache_->ClearCache();
+  ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+  ASSERT_NE(nullptr, rt);
+  vector<internal::RemoteReplica> replicas;
+  rt->GetRemoteReplicas(&replicas);
+  ASSERT_EQ(3, replicas.size());
+
+  // Set replication factor from 3 to 1.
+  unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(kTableName));
+  table_alterer->data_->set_replication_factor_to_ = 1;
+  ASSERT_OK(table_alterer->Alter());
+  ASSERT_EVENTUALLY([&] {
+    client_->data_->meta_cache_->ClearCache();
+    ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+    ASSERT_NE(nullptr, rt);
+    rt->GetRemoteReplicas(&replicas);
+    ASSERT_EQ(1, replicas.size());
+  });
+
+  // Set replication factor from 1 to 3.
+  table_alterer->data_->set_replication_factor_to_ = 3;
+  ASSERT_OK(table_alterer->Alter());
+  ASSERT_EVENTUALLY([&] {
+    client_->data_->meta_cache_->ClearCache();
+    ASSERT_OK(MetaCacheLookupById(tablet_id, &rt));
+    ASSERT_NE(nullptr, rt);
+    rt->GetRemoteReplicas(&replicas);
+    ASSERT_EQ(3, replicas.size());
+  });
+}
+
 class ReplicationFactorLimitsTest : public ClientTest {
  public:
   static constexpr const char* const kTableName = "replication_limits";
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index ab3ff08..4e51a9e 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -53,6 +53,7 @@
 
 namespace kudu {
 
+class AlterTableTest;
 class AuthzTokenTest;
 class ClientStressTest_TestUniqueClientIds_Test;
 class DisableWriteWhenExceedingQuotaTest;
@@ -81,6 +82,7 @@ class TxnSystemClient;
 namespace tools {
 class LeaderMasterProxy;
 class RemoteKsckCluster;
+class TableAlter;
 } // namespace tools
 
 namespace client {
@@ -1013,6 +1015,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   FRIEND_TEST(ClientTest, TestScanTimeout);
   FRIEND_TEST(ClientTest, TestWriteWithDeadMaster);
   FRIEND_TEST(MasterFailoverTest, TestPauseAfterCreateTableIssued);
+  FRIEND_TEST(MultiTServerClientTest, TestSetReplicationFactor);
 
   KuduClient();
 
@@ -2030,6 +2033,10 @@ class KUDU_EXPORT KuduTableAlterer {
   class KUDU_NO_EXPORT Data;
 
   friend class KuduClient;
+  friend class tools::TableAlter;
+  friend class kudu::AlterTableTest;
+
+  FRIEND_TEST(MultiTServerClientTest, TestSetReplicationFactor);
 
   KuduTableAlterer(KuduClient* client,
                    const std::string& name);
diff --git a/src/kudu/client/table_alterer-internal.cc b/src/kudu/client/table_alterer-internal.cc
index 2d0254d..fae67f7 100644
--- a/src/kudu/client/table_alterer-internal.cc
+++ b/src/kudu/client/table_alterer-internal.cc
@@ -63,6 +63,7 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
       !set_comment_to_.is_initialized() &&
       !disk_size_limit_ &&
       !row_count_limit_ &&
+      !set_replication_factor_to_.is_initialized() &&
       steps_.empty()) {
     return Status::InvalidArgument("No alter steps provided");
   }
@@ -83,6 +84,9 @@ Status KuduTableAlterer::Data::ToRequest(AlterTableRequestPB* req) {
   if (set_comment_to_.is_initialized()) {
     req->set_new_table_comment(set_comment_to_.get());
   }
+  if (set_replication_factor_to_.is_initialized()) {
+    req->set_num_replicas(set_replication_factor_to_.get());
+  }
 
   if (schema_ != nullptr) {
     RETURN_NOT_OK(SchemaToPB(*schema_, req->mutable_schema(),
diff --git a/src/kudu/client/table_alterer-internal.h b/src/kudu/client/table_alterer-internal.h
index 9959aee..6fb45fb 100644
--- a/src/kudu/client/table_alterer-internal.h
+++ b/src/kudu/client/table_alterer-internal.h
@@ -77,6 +77,7 @@ class KuduTableAlterer::Data {
   boost::optional<std::string> rename_to_;
   boost::optional<std::string> set_owner_to_;
   boost::optional<std::string> set_comment_to_;
+  boost::optional<int> set_replication_factor_to_;
 
   boost::optional<std::map<std::string, std::string>> new_extra_configs_;
 
diff --git a/src/kudu/integration-tests/alter_table-test.cc b/src/kudu/integration-tests/alter_table-test.cc
index 9b28292..d3735d4 100644
--- a/src/kudu/integration-tests/alter_table-test.cc
+++ b/src/kudu/integration-tests/alter_table-test.cc
@@ -25,6 +25,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <unordered_map>
 #include <utility>
 #include <vector>
 
@@ -39,18 +40,22 @@
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/schema.h"
 #include "kudu/client/shared_ptr.h" // IWYU pragma: keep
+#include "kudu/client/table_alterer-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/test_workload.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
@@ -64,20 +69,28 @@
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/maintenance_manager.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/random.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+METRIC_DECLARE_histogram(log_gc_duration);
+METRIC_DECLARE_entity(tablet);
+
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(log_inject_latency);
 DECLARE_bool(scanner_allow_snapshot_scans_with_logical_timestamps);
 DECLARE_bool(use_hybrid_clock);
 DECLARE_int32(flush_threshold_mb);
+DECLARE_int32(flush_threshold_secs);
 DECLARE_int32(heartbeat_interval_ms);
 DECLARE_int32(log_inject_latency_ms_mean);
+DECLARE_int32(log_segment_size_mb);
+DECLARE_int32(tablet_copy_download_file_inject_latency_ms);
 
 using kudu::client::CountTableRows;
 using kudu::client::KuduClient;
@@ -100,6 +113,8 @@ using kudu::client::KuduValue;
 using kudu::client::sp::shared_ptr;
 using kudu::cluster::InternalMiniCluster;
 using kudu::cluster::InternalMiniClusterOptions;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
 using kudu::master::AlterTableRequestPB;
 using kudu::master::AlterTableResponsePB;
 using kudu::tablet::TabletReplica;
@@ -138,7 +153,7 @@ class AlterTableTest : public KuduTest {
     KuduTest::SetUp();
 
     InternalMiniClusterOptions opts;
-    opts.num_tablet_servers = num_replicas();
+    opts.num_tablet_servers = num_tservers();
     cluster_.reset(new InternalMiniCluster(env_, opts));
     ASSERT_OK(cluster_->Start());
 
@@ -155,9 +170,9 @@ class AlterTableTest : public KuduTest {
              .num_replicas(num_replicas())
              .Create());
 
-    if (num_replicas() == 1) {
-      tablet_replica_ = LookupTabletReplica();
-      ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)));
+    if (num_replicas() > 0) {
+      tablet_replica_ = LookupLeaderTabletReplica();
+      CHECK(tablet_replica_);
     }
     LOG(INFO) << "Tablet successfully located";
   }
@@ -167,11 +182,34 @@ class AlterTableTest : public KuduTest {
     cluster_->Shutdown();
   }
 
-  scoped_refptr<TabletReplica> LookupTabletReplica() {
-    vector<scoped_refptr<TabletReplica> > replicas;
-    cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplicas(&replicas);
-    CHECK_EQ(1, replicas.size());
-    return replicas[0];
+  scoped_refptr<TabletReplica> LookupLeaderTabletReplica() {
+    static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+    for (int i = 0; i < num_tservers(); ++i) {
+      vector<scoped_refptr<TabletReplica>> replicas;
+      cluster_->mini_tablet_server(i)->server()->tablet_manager()->GetTabletReplicas(&replicas);
+      if (replicas.empty()) {
+        continue;
+      }
+
+      TabletServerMap tablet_servers;
+      ValueDeleter deleter(&tablet_servers);
+      CHECK_OK(CreateTabletServerMap(
+          cluster_->master_proxy(), cluster_->messenger(), &tablet_servers));
+
+      TServerDetails* leader = nullptr;
+      std::string tablet_id = replicas[0]->tablet_id();
+      CHECK_OK(FindTabletLeader(tablet_servers, tablet_id, kTimeout, &leader));
+      replicas.clear();
+      cluster_->mini_tablet_server_by_uuid(leader->uuid())->
+          server()->tablet_manager()->GetTabletReplicas(&replicas);
+      for (const auto& replica : replicas) {
+        if (replica->tablet_id() == tablet_id) {
+          return replica;
+        }
+      }
+      CHECK(false) << "leader tablet replica must has been found in prev steps.";
+    }
+    return nullptr;
   }
 
   void ShutdownTS() {
@@ -196,7 +234,7 @@ class AlterTableTest : public KuduTest {
 
     ASSERT_OK(cluster_->mini_tablet_server(idx)->WaitStarted());
     if (idx == 0) {
-      tablet_replica_ = LookupTabletReplica();
+      tablet_replica_ = LookupLeaderTabletReplica();
     }
   }
 
@@ -233,6 +271,56 @@ class AlterTableTest : public KuduTest {
     return table_alterer->timeout(timeout)->Alter();
   }
 
+  Status SetReplicationFactor(const string& table_name,
+                              int32_t replication_factor) {
+    unique_ptr<KuduTableAlterer> table_alterer(client_->NewTableAlterer(table_name));
+    table_alterer->data_->set_replication_factor_to_ = replication_factor;
+    return table_alterer->timeout(MonoDelta::FromSeconds(60))->Alter();
+  }
+
+  enum class VerifyRowCount {
+    kEnable = 0,
+    kDisable
+  };
+  void VerifyTabletReplicaCount(int32_t replication_factor, VerifyRowCount verify_row_count) {
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_EQ(replication_factor, tablet_replica_->consensus()->CommittedConfig().peers().size());
+
+      scoped_refptr<TabletReplica> first_node_replica;
+      uint64_t first_count = 0;
+      int actual_replica_count = 0;
+      for (int i = 0; i < num_tservers(); i++) {
+        vector<scoped_refptr<TabletReplica>> cur_node_replicas;
+        cluster_->mini_tablet_server(i)->server()->
+          tablet_manager()->GetTabletReplicas(&cur_node_replicas);
+        if (cur_node_replicas.empty()) continue;
+        ASSERT_EQ(1, cur_node_replicas.size());
+        const auto& cur_node_replica = cur_node_replicas[0];
+        if (!cur_node_replica->tablet()) continue;
+
+        ASSERT_OK(cur_node_replica->CheckRunning());
+        if (!first_node_replica) {
+          first_node_replica = cur_node_replica;
+          if (verify_row_count == VerifyRowCount::kEnable) {
+            ASSERT_OK(first_node_replica->CountLiveRows(&first_count));
+          }
+        } else {
+          ASSERT_EQ(first_node_replica->tablet()->tablet_id(),
+            cur_node_replica->tablet()->tablet_id());
+          ASSERT_TRUE(first_node_replica->tablet()->schema()->Equals(
+            *(cur_node_replica->tablet()->schema())));
+          if (verify_row_count == VerifyRowCount::kEnable) {
+            uint64_t cur_count = 0;
+            ASSERT_OK(cur_node_replica->CountLiveRows(&cur_count));
+            ASSERT_EQ(first_count, cur_count);
+          }
+        }
+        ++actual_replica_count;
+      }
+      ASSERT_EQ(replication_factor, actual_replica_count);
+    });
+  }
+
   enum VerifyPattern {
     C1_MATCHES_INDEX,
     C1_IS_DEADBEEF,
@@ -290,6 +378,7 @@ class AlterTableTest : public KuduTest {
 
  protected:
   virtual int num_replicas() const { return 1; }
+  virtual int num_tservers() const { return 1; }
 
   static const char* const kTableName;
 
@@ -313,7 +402,8 @@ class AlterTableTest : public KuduTest {
 // Subclass which creates three servers and a replicated cluster.
 class ReplicatedAlterTableTest : public AlterTableTest {
  protected:
-  virtual int num_replicas() const OVERRIDE { return 3; }
+  int num_replicas() const override { return 3; }
+  int num_tservers() const override { return num_replicas() + 1; }
 };
 
 const char* const AlterTableTest::kTableName = "fake-table";
@@ -2147,6 +2237,221 @@ TEST_F(ReplicatedAlterTableTest, AlterTableAndDropTablet) {
   ASSERT_OK(client_->DeleteTable(kTableName));
 }
 
+TEST_F(ReplicatedAlterTableTest, AlterReplicationFactor) {
+  // 1. The default replication factor is 3.
+  ASSERT_EQ(0, tablet_replica_->tablet()->metadata()->schema_version());
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+
+  // 2. Set replication factor to 1.
+  ASSERT_OK(SetReplicationFactor(kTableName, 1));
+  NO_FATALS(VerifyTabletReplicaCount(1, VerifyRowCount::kEnable));
+  ASSERT_EQ(1, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 3. Set replication factor to 3.
+  ASSERT_OK(SetReplicationFactor(kTableName, 3));
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 4. Set replication factor to 3 again.
+  ASSERT_OK(SetReplicationFactor(kTableName, 3));
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 5. Set replication factor to 5, while there are only 4 tservers in the cluster.
+  auto s = SetReplicationFactor(kTableName, 5);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "not enough live tablet servers to alter a table with the"
+                                    " requested replication factor 5; 4 tablet servers are alive");
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 6. Set replication factor to -1, it will fail.
+  s = SetReplicationFactor(kTableName, -1);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor -1: minimum allowed replication"
+                                    " factor is 1 (controlled by --min_num_replicas)");
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 7. Set replication factor to 2, it will fail.
+  s = SetReplicationFactor(kTableName, 2);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor 2: replication factor must be odd");
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  // 8. Set replication factor to 9, it will fail.
+  s = SetReplicationFactor(kTableName, 9);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "illegal replication factor 9: maximum allowed replication "
+                                    "factor is 7 (controlled by --max_num_replicas)");
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+
+  ASSERT_OK(client_->DeleteTable(kTableName));
+}
+
+TEST_F(ReplicatedAlterTableTest, AlterReplicationFactorWhileScanning) {
+  // Delete table at first, and create table by TestWorkload later.
+  ASSERT_OK(client_->DeleteTable(kTableName));
+
+  // Write some data for scan later.
+  {
+    TestWorkload workload(cluster_.get());
+    workload.set_table_name(kTableName);
+    workload.set_num_tablets(1);
+    workload.set_num_replicas(1);
+    workload.set_num_write_threads(10);
+    workload.Setup();
+    workload.Start();
+    ASSERT_EVENTUALLY([&]() {
+      ASSERT_GE(workload.rows_inserted(), 30000);
+    });
+    workload.StopAndJoin();
+  }
+
+  // Keep scaning the table.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_write_threads(0);
+  workload.set_num_read_threads(10);
+  workload.set_read_errors_allowed(false);
+  workload.Setup();
+  workload.Start();
+
+  // Set replication factor to 3.
+  ASSERT_OK(SetReplicationFactor(kTableName, 3));
+  ASSERT_EVENTUALLY([&] {
+    tablet_replica_ = LookupLeaderTabletReplica();
+    NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+    ASSERT_EQ(1, tablet_replica_->tablet()->metadata()->schema_version());
+  });
+
+  // Set replication factor to 1.
+  ASSERT_OK(SetReplicationFactor(kTableName, 1));
+  ASSERT_EVENTUALLY([&] {
+    tablet_replica_ = LookupLeaderTabletReplica();
+    NO_FATALS(VerifyTabletReplicaCount(1, VerifyRowCount::kEnable));
+    ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+  });
+
+  workload.StopAndJoin();
+}
+
+TEST_F(ReplicatedAlterTableTest, AlterReplicationFactorWhileWriting) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Additionally, make the WAL segments smaller to encourage more frequent
+  // roll-over onto WAL segments.
+  FLAGS_flush_threshold_secs = 0;
+  FLAGS_log_segment_size_mb = 1;
+
+  // Make tablet replica copying slow, to make it easier to spot violations.
+  FLAGS_tablet_copy_download_file_inject_latency_ms = 2000;
+
+  // Delete table at first, and create table by TestWorkload later.
+  ASSERT_OK(client_->DeleteTable(kTableName));
+
+  // Keep writing the table.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_tablets(1);
+  workload.set_num_replicas(1);
+  workload.set_num_write_threads(10);
+  workload.set_payload_bytes(1024);
+  workload.set_write_timeout_millis(5 * 1000);
+  workload.set_timeout_allowed(false);
+  workload.set_network_error_allowed(false);
+  workload.set_remote_error_allowed(false);
+  workload.Setup();
+  workload.Start();
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_GE(workload.rows_inserted(), 200000);
+  });
+
+  // Set replication factor to 3.
+  ASSERT_OK(SetReplicationFactor(kTableName, 3));
+  ASSERT_EVENTUALLY([&] {
+    tablet_replica_ = LookupLeaderTabletReplica();
+    // Not verify row count while table is being written.
+    NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kDisable));
+    ASSERT_EQ(1, tablet_replica_->tablet()->metadata()->schema_version());
+  });
+
+  // Stop writing and verify again.
+  workload.StopAndJoin();
+  ASSERT_EVENTUALLY([&] {
+    tablet_replica_ = LookupLeaderTabletReplica();
+    NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+    ASSERT_EQ(1, tablet_replica_->tablet()->metadata()->schema_version());
+  });
+
+  // Set replication factor to 1.
+  workload.Start();
+  ASSERT_OK(SetReplicationFactor(kTableName, 1));
+  ASSERT_EVENTUALLY([&] {
+    tablet_replica_ = LookupLeaderTabletReplica();
+    NO_FATALS(VerifyTabletReplicaCount(1, VerifyRowCount::kEnable));
+    ASSERT_EQ(2, tablet_replica_->tablet()->metadata()->schema_version());
+  });
+
+  workload.StopAndJoin();
+}
+
+TEST_F(ReplicatedAlterTableTest, AlterReplicationFactorAfterWALGCed) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Additionally, make the WAL segments smaller to encourage more frequent
+  // roll-over onto WAL segments.
+  FLAGS_flush_threshold_secs = 0;
+  FLAGS_log_segment_size_mb = 1;
+
+  ASSERT_OK(client_->DeleteTable(kTableName));
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_num_tablets(1);
+  workload.set_num_replicas(1);
+  workload.set_num_write_threads(10);
+  workload.Setup();
+  workload.Start();
+
+  // Function to fetch the GC count of all tablet's WAL.
+  auto get_tablet_wal_gc_count = [&] (int tserver_idx) {
+    int64_t tablet_wal_gc_count = 0;
+    itest::GetInt64Metric(
+        HostPort(cluster_->mini_tablet_server(tserver_idx)->bound_http_addr()),
+        &METRIC_ENTITY_tablet,
+        "*",
+        &METRIC_log_gc_duration,
+        "total_count",
+        &tablet_wal_gc_count);
+    return tablet_wal_gc_count;
+  };
+
+  vector<int64_t> orig_gc_count(num_tservers());
+  for (int tserver_idx = 0; tserver_idx < num_tservers(); tserver_idx++) {
+    orig_gc_count[tserver_idx] = get_tablet_wal_gc_count(tserver_idx);
+  }
+
+  // Wait util some WALs have been GCed.
+  ASSERT_EVENTUALLY([&] {
+    int num_tserver_gc_updated = 0;
+    for (int tserver_idx = 0; tserver_idx < num_tservers(); tserver_idx++) {
+      if (get_tablet_wal_gc_count(tserver_idx) > orig_gc_count[tserver_idx]) {
+        num_tserver_gc_updated++;
+      }
+    }
+    ASSERT_GE(num_tserver_gc_updated, 1);
+  });
+  workload.StopAndJoin();
+
+  // Set replication factor to 3.
+  ASSERT_OK(SetReplicationFactor(kTableName, 3));
+  tablet_replica_ = LookupLeaderTabletReplica();
+  NO_FATALS(VerifyTabletReplicaCount(3, VerifyRowCount::kEnable));
+  ASSERT_EQ(1, tablet_replica_->tablet()->metadata()->schema_version());
+}
+
 TEST_F(AlterTableTest, TestRenameStillCreatingTable) {
   const string kNewTableName = "foo";
 
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index d06bbdb..4464887 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -44,6 +44,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
@@ -1229,6 +1230,8 @@ Status GetInt64Metric(const HostPort& http_hp,
                       const MetricPrototype* metric_proto,
                       const char* value_field,
                       int64_t* value) {
+  *value = 0;
+  bool found = false;
   // Fetch metrics whose name matches the given prototype.
   string url = Substitute(
       "http://$0/jsonmetricz?metrics=$1",
@@ -1252,7 +1255,7 @@ Status GetInt64Metric(const HostPort& http_hp,
     if (entity_id) {
       string id;
       RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
-      if (id != entity_id) {
+      if (!MatchPattern(id, entity_id)) {
         continue;
       }
     }
@@ -1266,10 +1269,19 @@ Status GetInt64Metric(const HostPort& http_hp,
       if (name != metric_proto->name()) {
         continue;
       }
-      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, value));
-      return Status::OK();
+
+      int64_t v = 0;
+      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, &v));
+      found = true;
+      *value += v;
+      if (!entity_id) {
+        return Status::OK();
+      }
     }
   }
+  if (found) {
+    return Status::OK();
+  }
   string msg;
   if (entity_id) {
     msg = Substitute("Could not find metric $0.$1 for entity $2",
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 2d5b02c..342640d 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -438,7 +438,7 @@ Status BeginTabletCopySession(const TServerDetails* ts,
 // histogram, it might be 'total_count' or 'mean'.
 //
 // 'entity_id' may be NULL, in which case the first entity of the same type
-// as 'entity_proto' will be matched.
+// as 'entity_proto' will be matched, and also it can be a pattern.
 Status GetInt64Metric(const HostPort& http_hp,
                       const MetricEntityPrototype* entity_proto,
                       const char* entity_id,
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index d59b04e..2cbc05a 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -86,8 +86,10 @@ TestWorkload::TestWorkload(MiniCluster* cluster,
     read_errors_allowed_(false),
     timeout_allowed_(false),
     not_found_allowed_(false),
+    already_present_allowed_(false),
     network_error_allowed_(false),
     remote_error_allowed_(false),
+    write_pattern_(INSERT_RANDOM_ROWS),
     selection_(client::KuduClient::CLOSEST_REPLICA),
     schema_(KuduSchema::FromSchema(GetSimpleTestSchema())),
     num_replicas_(3),
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index ab29a95..f306284 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -257,6 +257,12 @@ DEFINE_bool(catalog_manager_check_ts_count_for_create_table, true,
             "a table to be created.");
 TAG_FLAG(catalog_manager_check_ts_count_for_create_table, hidden);
 
+DEFINE_bool(catalog_manager_check_ts_count_for_alter_table, true,
+            "Whether the master should ensure that there are enough live tablet "
+            "servers to satisfy the provided replication factor before allowing "
+            "a table to be altered.");
+TAG_FLAG(catalog_manager_check_ts_count_for_alter_table, hidden);
+
 DEFINE_int32(table_locations_ttl_ms, 5 * 60 * 1000, // 5 minutes
              "Maximum time in milliseconds which clients may cache table locations. "
              "New range partitions may not be visible to existing client instances "
@@ -1887,79 +1893,9 @@ Status CatalogManager::CreateTable(const CreateTableRequestPB* orig_req,
   }
 
   const auto num_replicas = req.num_replicas();
-  if (num_replicas > FLAGS_max_num_replicas) {
-    return SetupError(Status::InvalidArgument(
-        Substitute("illegal replication factor $0: maximum allowed replication "
-                   "factor is $1 (controlled by --max_num_replicas)",
-                   num_replicas, FLAGS_max_num_replicas)),
-        resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
-  }
-  if (num_replicas < FLAGS_min_num_replicas) {
-    return SetupError(Status::InvalidArgument(
-        Substitute("illegal replication factor $0: minimum allowed replication "
-                   "factor is $1 (controlled by --min_num_replicas)",
-            num_replicas, FLAGS_min_num_replicas)),
-        resp, MasterErrorPB::ILLEGAL_REPLICATION_FACTOR);
-  }
-  // Reject create table with even replication factors, unless master flag
-  // allow_unsafe_replication_factor is on.
-  if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) {
-    return SetupError(Status::InvalidArgument(
-        Substitute("illegal replication factor $0: replication factor must be odd",
-                   num_replicas)),
-        resp, MasterErrorPB::EVEN_REPLICATION_FACTOR);
-  }
-
-  // Verify that the number of replicas isn't larger than the number of live tablet
-  // servers.
-  TSDescriptorVector ts_descs;
-  master_->ts_manager()->GetDescriptorsAvailableForPlacement(&ts_descs);
-  const auto num_live_tservers = ts_descs.size();
-  if (FLAGS_catalog_manager_check_ts_count_for_create_table && num_replicas > num_live_tservers) {
-    // Note: this error message is matched against in master-stress-test.
-    return SetupError(Status::InvalidArgument(Substitute(
-            "not enough live tablet servers to create a table with the requested replication "
-            "factor $0; $1 tablet servers are alive", req.num_replicas(), num_live_tservers)),
-        resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
-  }
-
-  // Verify that the total number of replicas is reasonable.
-  //
-  // Table creation can generate a fair amount of load, both in the form of RPC
-  // traffic (due to Raft leader elections) and disk I/O (due to durably writing
-  // several files during both replica creation and leader elections).
-  //
-  // Ideally we would have more effective ways of mitigating this load (such
-  // as more efficient on-disk metadata management), but in lieu of that, we
-  // employ this coarse-grained check that prohibits up-front creation of too
-  // many replicas.
-  //
-  // Note: non-replicated tables are exempt because, by not using replication,
-  // they do not generate much of the load described above.
-  const auto max_replicas_total = FLAGS_max_create_tablets_per_ts * num_live_tservers;
-  if (num_replicas > 1 &&
-      max_replicas_total > 0 &&
-      partitions.size() * num_replicas > max_replicas_total) {
-    return SetupError(Status::InvalidArgument(Substitute(
-        "the requested number of tablet replicas is over the maximum permitted "
-        "at creation time ($0), additional tablets may be added by adding "
-        "range partitions to the table post-creation", max_replicas_total)),
-                      resp, MasterErrorPB::TOO_MANY_TABLETS);
-  }
-
-  // Warn if the number of live tablet servers is not enough to re-replicate
-  // a failed replica of the tablet.
-  const auto num_ts_needed_for_rereplication =
-      num_replicas + (FLAGS_raft_prepare_replacement_before_eviction ? 1 : 0);
-  if (num_replicas > 1 && num_ts_needed_for_rereplication > num_live_tservers) {
-    LOG(WARNING) << Substitute(
-        "The number of live tablet servers is not enough to re-replicate a "
-        "tablet replica of the newly created table $0 in case of a server "
-        "failure: $1 tablet servers would be needed, $2 are available. "
-        "Consider bringing up more tablet servers.",
-        normalized_table_name, num_ts_needed_for_rereplication,
-        num_live_tservers);
-  }
+  RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
+                                       resp, ValidateType::kCreateTable,
+                                       partitions.size(), num_replicas));
 
   // Verify the table's extra configuration properties.
   TableExtraConfigPB extra_config_pb;
@@ -3169,7 +3105,20 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
           resp, MasterErrorPB::UNKNOWN_ERROR));
   }
 
-  // 8. Alter table's extra configuration properties.
+  // 8. Alter table's replication factor.
+  bool num_replicas_changed = false;
+  if (req.has_num_replicas()) {
+    int num_replicas = req.num_replicas();
+    RETURN_NOT_OK(ValidateNumberReplicas(normalized_table_name,
+                                         resp, ValidateType::kAlterTable,
+                                         boost::none, num_replicas));
+    if (num_replicas != l.data().pb.num_replicas()) {
+      num_replicas_changed = true;
+      l.mutable_data()->pb.set_num_replicas(num_replicas);
+    }
+  }
+
+  // 9. Alter table's extra configuration properties.
   if (!req.new_extra_configs().empty()) {
     TRACE("Apply alter extra-config");
     Map<string, string> new_extra_configs;
@@ -3190,19 +3139,21 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
   bool has_metadata_changes = has_schema_changes ||
       req.has_new_table_name() || req.has_new_table_owner() ||
       !req.new_extra_configs().empty() || req.has_disk_size_limit() ||
-      req.has_row_count_limit() || req.has_new_table_comment();
+      req.has_row_count_limit() || req.has_new_table_comment() ||
+      num_replicas_changed;
   // Set to true if there are partitioning changes.
   bool has_partitioning_changes = !alter_partitioning_steps.empty();
   // Set to true if metadata changes need to be applied to existing tablets.
   bool has_metadata_changes_for_existing_tablets =
-    has_metadata_changes && table->num_tablets() > tablets_to_drop.size();
+    has_metadata_changes &&
+    (table->num_tablets() > tablets_to_drop.size() || num_replicas_changed);
 
   // Skip empty requests...
   if (!has_metadata_changes && !has_partitioning_changes) {
     return Status::OK();
   }
 
-  // 9. Serialize the schema and increment the version number.
+  // 10. Serialize the schema and increment the version number.
   if (has_metadata_changes_for_existing_tablets && !l.data().pb.has_fully_applied_schema()) {
     l.mutable_data()->pb.mutable_fully_applied_schema()->CopyFrom(l.data().pb.schema());
   }
@@ -3227,7 +3178,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
   TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
   TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);
 
-  // 10. Update sys-catalog with the new table schema and tablets to add/drop.
+  // 11. Update sys-catalog with the new table schema and tablets to add/drop.
   TRACE("Updating metadata on disk");
   {
     SysCatalogTable::Actions actions;
@@ -3257,7 +3208,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
     }
   }
 
-  // 11. Commit the in-memory state.
+  // 12. Commit the in-memory state.
   TRACE("Committing alterations to in-memory state");
   {
     // Commit new tablet in-memory state. This doesn't require taking the global
@@ -3349,7 +3300,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
     SendDeleteTabletRequest(tablet, l, deletion_msg);
   }
 
-  // 12. Invalidate/purge corresponding entries in the table locations cache.
+  // 13. Invalidate/purge corresponding entries in the table locations cache.
   if (table_locations_cache_ &&
       (!tablets_to_add.empty() || !tablets_to_drop.empty())) {
     table_locations_cache_->Remove(table->id());
@@ -4989,6 +4940,7 @@ Status CatalogManager::ProcessTabletReport(
     const string& tablet_id = e.first;
     const scoped_refptr<TabletInfo>& tablet = e.second;
     const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
+
     if (report.has_schema_version()) {
       HandleTabletSchemaVersionReport(tablet, report.schema_version());
     }
@@ -5907,6 +5859,95 @@ Status CatalogManager::WaitForNotificationLogListenerCatchUp(RespClass* resp,
   return Status::OK();
 }
 
+template<typename RespClass>
+Status CatalogManager::ValidateNumberReplicas(const std::string& normalized_table_name,
+                                              RespClass* resp, ValidateType type,
+                                              const boost::optional<int>& partitions_count,
+                                              int num_replicas) {
+  if (num_replicas > FLAGS_max_num_replicas) {
+    return SetupError(Status::InvalidArgument(
+        Substitute("illegal replication factor $0: maximum allowed replication "
+                   "factor is $1 (controlled by --max_num_replicas)",
+                   num_replicas, FLAGS_max_num_replicas)),
+        resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+  }
+  if (num_replicas < FLAGS_min_num_replicas) {
+    return SetupError(Status::InvalidArgument(
+        Substitute("illegal replication factor $0: minimum allowed replication "
+                   "factor is $1 (controlled by --min_num_replicas)",
+            num_replicas, FLAGS_min_num_replicas)),
+        resp, MasterErrorPB::ILLEGAL_REPLICATION_FACTOR);
+  }
+  // Reject create/alter table with even replication factors, unless master flag
+  // allow_unsafe_replication_factor is on.
+  if (num_replicas % 2 == 0 && !FLAGS_allow_unsafe_replication_factor) {
+    return SetupError(Status::InvalidArgument(
+        Substitute("illegal replication factor $0: replication factor must be odd",
+                   num_replicas)),
+        resp, MasterErrorPB::EVEN_REPLICATION_FACTOR);
+  }
+
+  // Verify that the number of replicas isn't larger than the number of live tablet
+  // servers.
+  TSDescriptorVector ts_descs;
+  master_->ts_manager()->GetDescriptorsAvailableForPlacement(&ts_descs);
+  const auto num_live_tservers = ts_descs.size();
+  if ((type == ValidateType::kCreateTable ? FLAGS_catalog_manager_check_ts_count_for_create_table :
+                                            FLAGS_catalog_manager_check_ts_count_for_alter_table) &&
+      num_replicas > num_live_tservers) {
+    // Note: this error message is matched against in master-stress-test.
+    return SetupError(Status::InvalidArgument(Substitute(
+        "not enough live tablet servers to $0 a table with the requested replication "
+        "factor $1; $2 tablet servers are alive",
+        type == ValidateType::kCreateTable ? "create" : "alter",
+        num_replicas, num_live_tservers)),
+                      resp, MasterErrorPB::REPLICATION_FACTOR_TOO_HIGH);
+  }
+
+  if (type == ValidateType::kCreateTable) {
+    // Verify that the total number of replicas is reasonable.
+    //
+    // Table creation can generate a fair amount of load, both in the form of RPC
+    // traffic (due to Raft leader elections) and disk I/O (due to durably writing
+    // several files during both replica creation and leader elections).
+    //
+    // Ideally we would have more effective ways of mitigating this load (such
+    // as more efficient on-disk metadata management), but in lieu of that, we
+    // employ this coarse-grained check that prohibits up-front creation of too
+    // many replicas.
+    //
+    // Note: non-replicated tables are exempt because, by not using replication,
+    // they do not generate much of the load described above.
+    const auto max_replicas_total = FLAGS_max_create_tablets_per_ts * num_live_tservers;
+    if (num_replicas > 1 && max_replicas_total > 0 &&
+        *partitions_count * num_replicas > max_replicas_total) {
+      return SetupError(Status::InvalidArgument(Substitute(
+                            "the requested number of tablet replicas is over the maximum permitted "
+                            "at creation time ($0), additional tablets may be added by adding "
+                            "range partitions to the table post-creation",
+                            max_replicas_total)),
+                        resp,
+                        MasterErrorPB::TOO_MANY_TABLETS);
+    }
+  }
+
+  // Warn if the number of live tablet servers is not enough to re-replicate
+  // a failed replica of the tablet.
+  const auto num_ts_needed_for_rereplication =
+      num_replicas + (FLAGS_raft_prepare_replacement_before_eviction ? 1 : 0);
+  if (num_replicas > 1 && num_ts_needed_for_rereplication > num_live_tservers) {
+    LOG(WARNING) << Substitute(
+        "The number of live tablet servers is not enough to re-replicate a "
+        "tablet replica of the $0 table $1 in case of a server "
+        "failure: $2 tablet servers would be needed, $3 are available. "
+        "Consider bringing up more tablet servers.",
+        type == ValidateType::kCreateTable ? "newly created" : "altering", normalized_table_name,
+        num_ts_needed_for_rereplication, num_live_tservers);
+  }
+
+  return Status::OK();
+}
+
 string CatalogManager::NormalizeTableName(const string& table_name) {
   // Force a deep copy on platforms with reference counted strings.
   string normalized_table_name(table_name.data(), table_name.size());
@@ -6237,7 +6278,6 @@ string TabletInfo::ToString() const {
                     (table_ != nullptr ? table_->ToString() : "MISSING"));
 }
 
-
 void TabletInfo::UpdateStats(ReportedTabletStatsPB stats) {
   std::lock_guard<simple_spinlock> l(lock_);
   stats_ = std::move(stats);
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index f6c177c..8b76d5e 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -1139,6 +1139,16 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   Status WaitForNotificationLogListenerCatchUp(RespClass* resp,
                                                rpc::RpcContext* rpc) WARN_UNUSED_RESULT;
 
+  enum class ValidateType {
+    kCreateTable = 0,
+    kAlterTable,
+  };
+  template<typename RespClass>
+  Status ValidateNumberReplicas(const std::string& normalized_table_name,
+                                RespClass* resp, ValidateType type,
+                                const boost::optional<int>& partitions_count,
+                                int num_replicas);
+
   // TODO(unknown): the maps are a little wasteful of RAM, since the TableInfo/TabletInfo
   // objects have a copy of the string key. But STL doesn't make it
   // easy to make a "gettable set".
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index e043d2a..ca83159 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -737,6 +737,7 @@ message AlterTableRequestPB {
   optional int64 row_count_limit = 9;
 
   optional string new_table_comment = 10;
+  optional int32 num_replicas = 11;
 }
 
 message AlterTableResponsePB {
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index dcfb8c5..71d84f2 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -334,7 +334,7 @@ void MasterServiceImpl::RemoveMaster(const RemoveMasterRequestPB* req,
 void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
                                     TSHeartbeatResponsePB* resp,
                                     rpc::RpcContext* rpc) {
-  // If CatalogManager is not initialized don't even know whether
+  // 1. If CatalogManager is not initialized don't even know whether
   // or not we will be a leader (so we can't tell whether or not we can
   // accept tablet reports).
   CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 88c0bfd..a9b3f18 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -1199,6 +1199,7 @@ TEST_F(ToolTest, TestModeHelp) {
         "set_comment.*Set the comment for a table",
         "set_extra_config.*Change a extra configuration value on a table",
         "set_limit.*Set the write limit for a table",
+        "set_replication_factor.*Change a table's replication factor",
         "statistics.*Get table statistics",
     };
     NO_FATALS(RunTestHelp(kCmd, kTableModeRegexes));
@@ -1225,6 +1226,7 @@ TEST_F(ToolTest, TestModeHelp) {
           "scan",
           "set_comment",
           "set_extra_config",
+          "set_replication_factor",
           "statistics",
         }));
   }
@@ -4543,6 +4545,86 @@ TEST_F(ToolTest, TestChangeTableLimitSupported) {
   ASSERT_EQ(-1, statistics->on_disk_size_limit());
 }
 
+TEST_F(ToolTest, TestSetReplicationFactor) {
+  constexpr int kNumTservers = 4;
+  ExternalMiniClusterOptions opts;
+  opts.num_tablet_servers = kNumTservers;
+  NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+  constexpr const char* const kTableName = "kudu.table.set.replication_factor";
+
+  KuduSchemaBuilder schema_builder;
+  schema_builder.AddColumn("key")
+      ->Type(client::KuduColumnSchema::INT32)
+      ->NotNull()
+      ->PrimaryKey();
+  schema_builder.AddColumn("value")
+      ->Type(client::KuduColumnSchema::INT32)
+      ->NotNull();
+  KuduSchema schema;
+  ASSERT_OK(schema_builder.Build(&schema));
+
+  // Create the table.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableName);
+  workload.set_schema(schema);
+  workload.set_num_replicas(1);
+  workload.Setup();
+
+  string master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(KuduClientBuilder()
+                .add_master_server_addr(master_addr)
+                .Build(&client));
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+
+  ASSERT_EQ(1, table->num_replicas());
+
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_replication_factor $0 $1 3",
+                                           master_addr, kTableName)));
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  ASSERT_EQ(3, table->num_replicas());
+
+  NO_FATALS(RunActionStdoutNone(Substitute("table set_replication_factor $0 $1 1",
+                                           master_addr, kTableName)));
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  ASSERT_EQ(1, table->num_replicas());
+
+  string stderr;
+  Status s = RunActionStderrString(
+      Substitute("table set_replication_factor $0 $1 3a",
+                 master_addr, kTableName), &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "Unable to parse replication factor value: 3a.");
+
+  s = RunActionStderrString(
+      Substitute("table set_replication_factor $0 $1 0",
+                 master_addr, kTableName), &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "illegal replication factor 0: minimum allowed replication "
+                      "factor is 1 (controlled by --min_num_replicas)");
+
+  s = RunActionStderrString(
+      Substitute("table set_replication_factor $0 $1 2",
+                 master_addr, kTableName), &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "illegal replication factor 2: replication factor must be odd");
+
+  s = RunActionStderrString(
+      Substitute("table set_replication_factor $0 $1 5",
+                 master_addr, kTableName), &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "not enough live tablet servers to alter a table with the"
+                              " requested replication factor 5; 4 tablet servers are alive");
+
+  s = RunActionStderrString(
+      Substitute("table set_replication_factor $0 $1 9",
+                 master_addr, kTableName), &stderr);
+  ASSERT_TRUE(s.IsRuntimeError());
+  ASSERT_STR_CONTAINS(stderr, "illegal replication factor 9: maximum allowed replication "
+                              "factor is 7 (controlled by --max_num_replicas)");
+}
+
 Status CreateLegacyHmsTable(HmsClient* client,
                             const string& hms_database_name,
                             const string& hms_table_name,
diff --git a/src/kudu/tools/tool_action_table.cc b/src/kudu/tools/tool_action_table.cc
index 974014a..15e2614 100644
--- a/src/kudu/tools/tool_action_table.cc
+++ b/src/kudu/tools/tool_action_table.cc
@@ -38,6 +38,7 @@
 
 #include "kudu/client/client.h"
 #include "kudu/client/replica_controller-internal.h"
+#include "kudu/client/table_alterer-internal.h"
 #include "kudu/client/scan_batch.h"
 #include "kudu/client/scan_predicate.h"
 #include "kudu/client/schema.h"
@@ -172,6 +173,20 @@ class TableLister {
   }
 };
 
+// This class only exists so that it can easily be friended by KuduTableAlterer.
+class TableAlter {
+ public:
+  static Status SetReplicationFactor(const vector<string>& master_addresses,
+                                     const string& table_name,
+                                     int32_t replication_factor) {
+    client::sp::shared_ptr<KuduClient> client;
+    RETURN_NOT_OK(CreateKuduClient(master_addresses, &client));
+    unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
+    alterer->data_->set_replication_factor_to_ = replication_factor;
+    return alterer->Alter();
+  }
+};
+
 namespace {
 
 const char* const kNewTableNameArg = "new_table_name";
@@ -189,6 +204,7 @@ const char* const kEncodingTypeArg = "encoding_type";
 const char* const kBlockSizeArg = "block_size";
 const char* const kColumnCommentArg = "column_comment";
 const char* const kCreateTableJSONArg = "create_table_json";
+const char* const kReplicationFactorArg = "replication_factor";
 
 enum PartitionAction {
   ADD,
@@ -958,6 +974,21 @@ Status DeleteColumn(const RunnerContext& context) {
   return alterer->Alter();
 }
 
+Status SetReplicationFactor(const RunnerContext& context) {
+  vector<string> master_addresses;
+  RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
+  const string& table_name = FindOrDie(context.required_args, kTableNameArg);
+  const string& str_replication_factor = FindOrDie(context.required_args, kReplicationFactorArg);
+
+  int32_t replication_factor;
+  if (!safe_strto32(str_replication_factor, &replication_factor)) {
+    return Status::InvalidArgument(Substitute(
+        "Unable to parse replication factor value: $0.", str_replication_factor));
+  }
+
+  return TableAlter::SetReplicationFactor(master_addresses, table_name, replication_factor);
+}
+
 Status GetTableStatistics(const RunnerContext& context) {
   const string& table_name = FindOrDie(context.required_args, kTableNameArg);
   client::sp::shared_ptr<KuduClient> client;
@@ -1472,6 +1503,13 @@ unique_ptr<Mode> BuildTableMode() {
       .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
       .Build();
 
+  unique_ptr<Action> set_replication_factor =
+      ClusterActionBuilder("set_replication_factor", &SetReplicationFactor)
+      .Description("Change a table's replication factor")
+      .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
+      .AddRequiredParameter({ kReplicationFactorArg, "New replication factor of the table" })
+      .Build();
+
   unique_ptr<Action> statistics =
       ClusterActionBuilder("statistics", &GetTableStatistics)
       .Description("Get table statistics")
@@ -1523,6 +1561,7 @@ unique_ptr<Mode> BuildTableMode() {
       .AddAction(std::move(scan_table))
       .AddAction(std::move(set_comment))
       .AddAction(std::move(set_extra_config))
+      .AddAction(std::move(set_replication_factor))
       .AddAction(std::move(statistics))
       .Build();
 }
diff --git a/src/kudu/tserver/heartbeater.cc b/src/kudu/tserver/heartbeater.cc
index b38bd27..e3fc80e 100644
--- a/src/kudu/tserver/heartbeater.cc
+++ b/src/kudu/tserver/heartbeater.cc
@@ -244,6 +244,7 @@ Status Heartbeater::Start() {
 
   return Status::OK();
 }
+
 Status Heartbeater::Stop() {
   // Stop all threads and return the first failure (if there was one).
   Status first_failure;