You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/03/22 00:38:00 UTC
[1/3] kudu git commit: KUDU-1607. Unpin tablet flush after failed
bootstrap
Repository: kudu
Updated Branches:
refs/heads/master 87bd862cb -> 889d15246
KUDU-1607. Unpin tablet flush after failed bootstrap
We have heard reports that, in certain scenarios, a failed tablet is
unable to be deleted. After some investigation, I determined that this
is because we neglect to unpin the Tablet when tablet bootstrap fails.
When a table is being deleted, we delete each tablet superblock by
calling TabletMetadata::DeleteSuperBlock(). This method double-checks
that no orphaned blocks remain referenced to ensure we don't leak disk
space. That requires TabletMetadata::DeleteTabletData() to be called
first, which invokes Flush() twice to ensure that no orphaned blocks are
referenced on disk upon returning. However, if the tablet is pinned,
Flush() silently becomes a no-op (except for a log message that is
printed) and so DeleteSuperBlock() also fails, resulting in the tablet
not being fully deleted and the following warning message being written
to the log file:
W0317 13:05:19.324373 13242 ts_tablet_manager.cc:634] Invalid argument: Unable to delete on-disk data from tablet d1b857ddaa0743c79c9bcbd9b2fda174: The metadata for tablet d1b857ddaa0743c79c9bcbd9b2fda174 still references orphaned blocks. Call DeleteTabletData() first
This patch makes the following changes:
1. Always unpin the tablet after pinning it.
2. Add a new itest that fails without that change, reproducing the
above warning message.
3. Add a little more test infra to MiniCluster (and the MiniClusterBase
interface) to make it easier to use the helper functions in
cluster_itest_util.h in MiniCluster-based itests.
Change-Id: Id274c6ee1da75bc6f92ab91c0a01edaa009b8962
Reviewed-on: http://gerrit.cloudera.org:8080/6422
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0450f77f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0450f77f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0450f77f
Branch: refs/heads/master
Commit: 0450f77f69c74cc6dec08ad36bb89ac12c17608f
Parents: 87bd862
Author: Mike Percy <mp...@apache.org>
Authored: Fri Mar 17 15:19:36 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Mar 22 00:37:32 2017 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../integration-tests/delete_tablet-itest.cc | 106 +++++++++++++++++++
.../integration-tests/external_mini_cluster.cc | 6 +-
.../integration-tests/external_mini_cluster.h | 22 +---
src/kudu/integration-tests/mini_cluster.cc | 23 +++-
src/kudu/integration-tests/mini_cluster.h | 6 ++
src/kudu/integration-tests/mini_cluster_base.h | 22 +++-
src/kudu/tablet/tablet_bootstrap.cc | 78 +++++++++-----
src/kudu/tserver/mini_tablet_server.cc | 5 +
src/kudu/tserver/mini_tablet_server.h | 3 +
10 files changed, 224 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d7ae7be..9027455 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -60,6 +60,7 @@ ADD_KUDU_TEST(consistency-itest)
ADD_KUDU_TEST(create-table-itest)
ADD_KUDU_TEST(create-table-stress-test)
ADD_KUDU_TEST(delete_table-test)
+ADD_KUDU_TEST(delete_tablet-itest)
ADD_KUDU_TEST(disk_reservation-itest)
ADD_KUDU_TEST(exactly_once_writes-itest)
ADD_KUDU_TEST(external_mini_cluster-test RESOURCE_LOCK "master-rpc-ports")
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/delete_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_tablet-itest.cc b/src/kudu/integration-tests/delete_tablet-itest.cc
new file mode 100644
index 0000000..e145701
--- /dev/null
+++ b/src/kudu/integration-tests/delete_tablet-itest.cc
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <vector>
+
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+
+DECLARE_int64(fs_wal_dir_reserved_bytes);
+
+using kudu::tablet::TabletPeer;
+using std::vector;
+
+namespace kudu {
+
+class DeleteTabletITest : public MiniClusterITestBase {
+};
+
+// Test deleting a failed replica. Regression test for KUDU-1607.
+TEST_F(DeleteTabletITest, TestDeleteFailedReplica) {
+ NO_FATALS(StartCluster(1)); // 1 TS.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(1);
+ workload.Setup();
+
+ std::unordered_map<std::string, itest::TServerDetails*> ts_map;
+ ValueDeleter del(&ts_map);
+ ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy().get(),
+ cluster_->messenger(),
+ &ts_map));
+ auto* mts = cluster_->mini_tablet_server(0);
+ auto* ts = ts_map[mts->uuid()];
+
+ scoped_refptr<TabletPeer> tablet_peer;
+ AssertEventually([&] {
+ vector<scoped_refptr<TabletPeer>> tablet_peers;
+ mts->server()->tablet_manager()->GetTabletPeers(&tablet_peers);
+ ASSERT_EQ(1, tablet_peers.size());
+ tablet_peer = tablet_peers[0];
+ });
+ NO_FATALS();
+
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ // We need blocks on-disk for this regression test, so force an MRS flush.
+ ASSERT_OK(tablet_peer->tablet()->Flush());
+
+ // Shut down the master so it doesn't crash the test process when we make the
+ // disk appear to be full.
+ cluster_->mini_master()->Shutdown();
+
+ // Shut down the TS and restart it after changing flags to ensure no data can
+ // be written during tablet bootstrap.
+ mts->Shutdown();
+ FLAGS_fs_wal_dir_reserved_bytes = INT64_MAX;
+ ASSERT_OK(mts->Restart());
+ Status s = mts->server()->tablet_manager()->WaitForAllBootstrapsToFinish();
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_STR_CONTAINS(s.ToString(), "Insufficient disk space");
+
+ // Tablet bootstrap failure should result in tablet status == FAILED.
+ {
+ vector<scoped_refptr<TabletPeer>> tablet_peers;
+ mts->server()->tablet_manager()->GetTabletPeers(&tablet_peers);
+ ASSERT_EQ(1, tablet_peers.size());
+ tablet_peer = tablet_peers[0];
+ ASSERT_EQ(tablet::FAILED, tablet_peer->state());
+ }
+
+ // We should still be able to delete the failed tablet.
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_peer->tablet_id(), tablet::TABLET_DATA_DELETED,
+ boost::none, MonoDelta::FromSeconds(30)));
+ AssertEventually([&] {
+ vector<scoped_refptr<TabletPeer>> tablet_peers;
+ mts->server()->tablet_manager()->GetTabletPeers(&tablet_peers);
+ ASSERT_EQ(0, tablet_peers.size());
+ });
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index e82b6ec..411c097 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -537,16 +537,16 @@ vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
return results;
}
-std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() {
+std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
return messenger_;
}
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() {
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const {
CHECK_EQ(masters_.size(), 1);
return master_proxy(0);
}
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) {
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const {
CHECK_LT(idx, masters_.size());
return std::shared_ptr<MasterServiceProxy>(
new MasterServiceProxy(messenger_, CHECK_NOTNULL(master(idx))->bound_rpc_addr()));
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index c696f5d..d460a84 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -46,14 +46,6 @@ class NodeInstancePB;
class Sockaddr;
class Subprocess;
-namespace master {
-class MasterServiceProxy;
-} // namespace master
-
-namespace rpc {
-class Messenger;
-} // namespace rpc
-
namespace server {
class ServerStatusPB;
} // namespace server
@@ -235,17 +227,9 @@ class ExternalMiniCluster : public MiniClusterBase {
return masters_.size();
}
- // Return the client messenger used by the ExternalMiniCluster.
- std::shared_ptr<rpc::Messenger> messenger();
-
- // If the cluster is configured for a single non-distributed master,
- // return a proxy to that master. Requires that the single master is
- // running.
- std::shared_ptr<master::MasterServiceProxy> master_proxy();
-
- // Returns an RPC proxy to the master at 'idx'. Requires that the
- // master at 'idx' is running.
- std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx);
+ std::shared_ptr<rpc::Messenger> messenger() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
// Wait until the number of registered tablet servers reaches the given count
// on all of the running masters. Returns Status::TimedOut if the desired
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index d50c339..c910502 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -23,6 +23,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
@@ -41,8 +42,8 @@ namespace kudu {
using client::KuduClient;
using client::KuduClientBuilder;
using master::CatalogManager;
+using master::MasterServiceProxy;
using master::MiniMaster;
-using master::TabletLocationsPB;
using master::TSDescriptor;
using std::shared_ptr;
using tserver::MiniTabletServer;
@@ -96,6 +97,12 @@ Status MiniCluster::Start() {
RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(num_ts_initial_),
"Waiting for tablet servers to start");
+ RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
+ .set_num_reactors(1)
+ .set_max_negotiation_threads(1)
+ .Build(&messenger_),
+ "Failed to start Messenger for minicluster");
+
running_ = true;
return Status::OK();
}
@@ -324,4 +331,18 @@ Status MiniCluster::GetLeaderMasterIndex(int* idx) const {
return Status::OK();
}
+std::shared_ptr<rpc::Messenger> MiniCluster::messenger() const {
+ return messenger_;
+}
+
+std::shared_ptr<MasterServiceProxy> MiniCluster::master_proxy() const {
+ CHECK_EQ(1, mini_masters_.size());
+ return master_proxy(0);
+}
+
+std::shared_ptr<MasterServiceProxy> MiniCluster::master_proxy(int idx) const {
+ return std::shared_ptr<MasterServiceProxy>(
+ new MasterServiceProxy(messenger_, CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr()));
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster.h b/src/kudu/integration-tests/mini_cluster.h
index be5928e..cf49aee 100644
--- a/src/kudu/integration-tests/mini_cluster.h
+++ b/src/kudu/integration-tests/mini_cluster.h
@@ -162,6 +162,10 @@ class MiniCluster : public MiniClusterBase {
// last result may not be valid.
Status GetLeaderMasterIndex(int* idx) const;
+ std::shared_ptr<rpc::Messenger> messenger() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+
private:
enum {
kRegistrationWaitTimeSeconds = 15,
@@ -180,6 +184,8 @@ class MiniCluster : public MiniClusterBase {
std::vector<std::shared_ptr<master::MiniMaster> > mini_masters_;
std::vector<std::shared_ptr<tserver::MiniTabletServer> > mini_tablet_servers_;
+ std::shared_ptr<rpc::Messenger> messenger_;
+
DISALLOW_COPY_AND_ASSIGN(MiniCluster);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/integration-tests/mini_cluster_base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/mini_cluster_base.h b/src/kudu/integration-tests/mini_cluster_base.h
index 1fd1985..a71b807 100644
--- a/src/kudu/integration-tests/mini_cluster_base.h
+++ b/src/kudu/integration-tests/mini_cluster_base.h
@@ -24,7 +24,15 @@ class Status;
namespace client {
class KuduClient;
class KuduClientBuilder;
-}
+} // namespace client
+
+namespace master {
+class MasterServiceProxy;
+} // namespace master
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
// Mode to which node types a certain action (like Shutdown()) should apply.
enum class ClusterNodes {
@@ -65,6 +73,18 @@ class MiniClusterBase {
// REQUIRES: the cluster must have already been Start()ed.
virtual Status CreateClient(client::KuduClientBuilder* builder,
client::sp::shared_ptr<client::KuduClient>* client) const = 0;
+
+ // Return a messenger for use by clients.
+ virtual std::shared_ptr<rpc::Messenger> messenger() const = 0;
+
+ // If the cluster is configured for a single non-distributed master,
+ // return a proxy to that master. Requires that the single master is
+ // running.
+ virtual std::shared_ptr<master::MasterServiceProxy> master_proxy() const = 0;
+
+ // Returns an RPC proxy to the master at 'idx'. Requires that the
+ // master at 'idx' is running.
+ virtual std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const = 0;
};
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 3052868..04eea5f 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -58,6 +58,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
DECLARE_int32(group_commit_queue_size_bytes);
@@ -179,10 +180,17 @@ class TabletBootstrap {
// A successful call will yield the rebuilt tablet and the rebuilt log.
Status Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
- ConsensusBootstrapInfo* results);
+ ConsensusBootstrapInfo* consensus_info);
private:
+ // The method that does the actual work of tablet bootstrap. Bootstrap() is
+ // actually a wrapper method that is responsible for pinning and unpinning
+ // the tablet metadata flush.
+ Status RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
+ scoped_refptr<Log>* rebuilt_log,
+ ConsensusBootstrapInfo* consensus_info);
+
// Opens the tablet.
// Sets '*has_blocks' to true if there was any data on disk for this tablet.
Status OpenTablet(bool* has_blocks);
@@ -215,9 +223,9 @@ class TabletBootstrap {
Status OpenNewLog();
// Finishes bootstrap, setting 'rebuilt_log' and 'rebuilt_tablet'.
- Status FinishBootstrap(const string& message,
- scoped_refptr<log::Log>* rebuilt_log,
- shared_ptr<Tablet>* rebuilt_tablet);
+ void FinishBootstrap(const string& message,
+ scoped_refptr<log::Log>* rebuilt_log,
+ shared_ptr<Tablet>* rebuilt_tablet);
// Plays the log segments into the tablet being built.
// The process of playing the segments generates a new log that can be continued
@@ -441,6 +449,43 @@ TabletBootstrap::TabletBootstrap(
Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
scoped_refptr<Log>* rebuilt_log,
ConsensusBootstrapInfo* consensus_info) {
+ // We pin (prevent) metadata flush at the beginning of the bootstrap process
+ // and always unpin it at the end.
+ meta_->PinFlush();
+
+ // Now run the actual bootstrap process.
+ Status bootstrap_status = RunBootstrap(rebuilt_tablet, rebuilt_log, consensus_info);
+
+ // Add a callback to TabletMetadata that makes sure that each time we flush the metadata
+ // we also wait for in-flights to finish and for their wal entry to be fsynced.
+ // This might be a bit conservative in some situations but it will prevent us from
+ // ever flushing the metadata referring to tablet data blocks containing data whose
+ // commit entries are not durable, a pre-requisite for recovery.
+ CHECK((*rebuilt_tablet && *rebuilt_log) || !bootstrap_status.ok())
+ << "Tablet and Log not initialized";
+ if (bootstrap_status.ok()) {
+ meta_->SetPreFlushCallback(
+ Bind(&FlushInflightsToLogCallback::WaitForInflightsAndFlushLog,
+ make_scoped_refptr(new FlushInflightsToLogCallback(
+ rebuilt_tablet->get(), *rebuilt_log))));
+ }
+
+ // This will cause any pending TabletMetadata flush to be executed.
+ Status unpin_status = meta_->UnPinFlush();
+
+ constexpr char kFailedUnpinMsg[] = "Failed to flush after unpinning";
+ if (PREDICT_FALSE(!bootstrap_status.ok() && !unpin_status.ok())) {
+ LOG_WITH_PREFIX(WARNING) << kFailedUnpinMsg << ": " << unpin_status.ToString();
+ return bootstrap_status;
+ }
+ RETURN_NOT_OK(bootstrap_status);
+ RETURN_NOT_OK_PREPEND(unpin_status, Substitute("$0$1", LogPrefix(), kFailedUnpinMsg));
+ return Status::OK();
+}
+
+Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
+ scoped_refptr<Log>* rebuilt_log,
+ ConsensusBootstrapInfo* consensus_info) {
string tablet_id = meta_->tablet_id();
// Replay requires a valid Consensus metadata file to exist in order to
@@ -460,8 +505,6 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
TabletDataState_Name(tablet_data_state));
}
- meta_->PinFlush();
-
StatusMessage("Bootstrap starting.");
if (VLOG_IS_ON(1)) {
@@ -485,9 +528,7 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
if (!has_blocks && !needs_recovery) {
LOG_WITH_PREFIX(INFO) << "No blocks or log segments found. Creating new log.";
RETURN_NOT_OK_PREPEND(OpenNewLog(), "Failed to open new log");
- RETURN_NOT_OK(FinishBootstrap("No bootstrap required, opened a new log",
- rebuilt_log,
- rebuilt_tablet));
+ FinishBootstrap("No bootstrap required, opened a new log", rebuilt_log, rebuilt_tablet);
consensus_info->last_id = MinimumOpId();
consensus_info->last_committed_id = MinimumOpId();
return Status::OK();
@@ -508,29 +549,18 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
cmeta_->Flush();
RETURN_NOT_OK(RemoveRecoveryDir());
- RETURN_NOT_OK(FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet));
+ FinishBootstrap("Bootstrap complete.", rebuilt_log, rebuilt_tablet);
return Status::OK();
}
-Status TabletBootstrap::FinishBootstrap(const string& message,
- scoped_refptr<log::Log>* rebuilt_log,
- shared_ptr<Tablet>* rebuilt_tablet) {
- // Add a callback to TabletMetadata that makes sure that each time we flush the metadata
- // we also wait for in-flights to finish and for their wal entry to be fsynced.
- // This might be a bit conservative in some situations but it will prevent us from
- // ever flushing the metadata referring to tablet data blocks containing data whose
- // commit entries are not durable, a pre-requisite for recovery.
- meta_->SetPreFlushCallback(
- Bind(&FlushInflightsToLogCallback::WaitForInflightsAndFlushLog,
- make_scoped_refptr(new FlushInflightsToLogCallback(tablet_.get(),
- log_))));
+void TabletBootstrap::FinishBootstrap(const string& message,
+ scoped_refptr<log::Log>* rebuilt_log,
+ shared_ptr<Tablet>* rebuilt_tablet) {
tablet_->MarkFinishedBootstrapping();
- RETURN_NOT_OK(tablet_->metadata()->UnPinFlush());
StatusMessage(message);
rebuilt_tablet->reset(tablet_.release());
rebuilt_log->swap(log_);
- return Status::OK();
}
Status TabletBootstrap::OpenTablet(bool* has_blocks) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/tserver/mini_tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.cc b/src/kudu/tserver/mini_tablet_server.cc
index f4cb331..aabb9df 100644
--- a/src/kudu/tserver/mini_tablet_server.cc
+++ b/src/kudu/tserver/mini_tablet_server.cc
@@ -135,5 +135,10 @@ const Sockaddr MiniTabletServer::bound_http_addr() const {
return server_->first_http_address();
}
+const string& MiniTabletServer::uuid() const {
+ CHECK(started_);
+ return server_->fs_manager()->uuid();
+}
+
} // namespace tserver
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/0450f77f/src/kudu/tserver/mini_tablet_server.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.h b/src/kudu/tserver/mini_tablet_server.h
index 4c205fd..34ed945 100644
--- a/src/kudu/tserver/mini_tablet_server.h
+++ b/src/kudu/tserver/mini_tablet_server.h
@@ -87,6 +87,9 @@ class MiniTabletServer {
const TabletServer* server() const { return server_.get(); }
TabletServer* server() { return server_.get(); }
+ // Return TS uuid.
+ const std::string& uuid() const;
+
bool is_started() const { return started_; }
void FailHeartbeats();
[3/3] kudu git commit: itests: Rename delete_table-test to
delete_table-itest
Posted by mp...@apache.org.
itests: Rename delete_table-test to delete_table-itest
For naming consistency with delete_tablet-itest
Change-Id: I975e9fe9c5531f6af64cae8cd097706c5b554364
Reviewed-on: http://gerrit.cloudera.org:8080/6454
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/889d1524
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/889d1524
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/889d1524
Branch: refs/heads/master
Commit: 889d1524650480ba7506152ac6df154267f9da08
Parents: 0450f77
Author: Mike Percy <mp...@apache.org>
Authored: Tue Mar 21 14:58:15 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Wed Mar 22 00:37:40 2017 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/CMakeLists.txt | 2 +-
.../integration-tests/delete_table-itest.cc | 1470 +++++++++++++++++
src/kudu/integration-tests/delete_table-test.cc | 1472 ------------------
3 files changed, 1471 insertions(+), 1473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/889d1524/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index 9027455..2ef0fd2 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -59,7 +59,7 @@ ADD_KUDU_TEST(client-stress-test
ADD_KUDU_TEST(consistency-itest)
ADD_KUDU_TEST(create-table-itest)
ADD_KUDU_TEST(create-table-stress-test)
-ADD_KUDU_TEST(delete_table-test)
+ADD_KUDU_TEST(delete_table-itest)
ADD_KUDU_TEST(delete_tablet-itest)
ADD_KUDU_TEST(disk_reservation-itest)
ADD_KUDU_TEST(exactly_once_writes-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/889d1524/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
new file mode 100644
index 0000000..6079121
--- /dev/null
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -0,0 +1,1470 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <string>
+#include <tuple>
+
+#include <boost/optional.hpp>
+#include <glog/stl_logging.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/pstack_watcher.h"
+#include "kudu/util/subprocess.h"
+
+using kudu::client::KuduClient;
+using kudu::client::KuduScanner;
+using kudu::client::KuduScanBatch;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaFromSchema;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
+using kudu::consensus::ConsensusMetadataPB;
+using kudu::consensus::ConsensusStatePB;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::TServerDetails;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_READY;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletDataState;
+using kudu::tablet::TabletSuperBlockPB;
+using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::TabletServerErrorPB;
+using std::numeric_limits;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DECLARE_entity(server);
+METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_TSHeartbeat);
+METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet);
+
+namespace kudu {
+
+class DeleteTableITest : public ExternalMiniClusterITestBase {
+ protected:
+ enum IsCMetaExpected {
+ CMETA_NOT_EXPECTED = 0,
+ CMETA_EXPECTED = 1
+ };
+
+ enum IsSuperBlockExpected {
+ SUPERBLOCK_NOT_EXPECTED = 0,
+ SUPERBLOCK_EXPECTED = 1
+ };
+
+ enum ErrorDumpStackSelector {
+ ON_ERROR_DO_NOT_DUMP_STACKS = 0,
+ ON_ERROR_DUMP_STACKS = 1,
+ };
+
+ // Get the UUID of the leader of the specified tablet, as seen by the TS with
+ // the given 'ts_uuid'.
+ string GetLeaderUUID(const string& ts_uuid, const string& tablet_id);
+
+ Status CheckTabletTombstonedOrDeletedOnTS(
+ int index,
+ const string& tablet_id,
+ TabletDataState data_state,
+ IsCMetaExpected is_cmeta_expected,
+ IsSuperBlockExpected is_superblock_expected);
+
+ Status CheckTabletTombstonedOnTS(int index,
+ const string& tablet_id,
+ IsCMetaExpected is_cmeta_expected);
+
+ Status CheckTabletDeletedOnTS(int index,
+ const string& tablet_id,
+ IsSuperBlockExpected is_superblock_expected);
+
+ void WaitForTabletTombstonedOnTS(int index,
+ const string& tablet_id,
+ IsCMetaExpected is_cmeta_expected);
+
+ void WaitForTabletDeletedOnTS(int index,
+ const string& tablet_id,
+ IsSuperBlockExpected is_superblock_expected);
+
+ void WaitForTSToCrash(int index);
+ void WaitForAllTSToCrash();
+ void WaitUntilTabletRunning(int index, const std::string& tablet_id);
+
+ // Delete the given table. If the operation times out, optionally dump
+ // the master stacks to help debug master-side deadlocks.
+ void DeleteTable(const string& table_name,
+ ErrorDumpStackSelector selector = ON_ERROR_DUMP_STACKS);
+};
+
+string DeleteTableITest::GetLeaderUUID(const string& ts_uuid, const string& tablet_id) {
+ ConsensusStatePB cstate;
+ CHECK_OK(itest::GetConsensusState(ts_map_[ts_uuid], tablet_id, CONSENSUS_CONFIG_COMMITTED,
+ MonoDelta::FromSeconds(10), &cstate));
+ return cstate.leader_uuid();
+}
+
+Status DeleteTableITest::CheckTabletTombstonedOrDeletedOnTS(
+ int index,
+ const string& tablet_id,
+ TabletDataState data_state,
+ IsCMetaExpected is_cmeta_expected,
+ IsSuperBlockExpected is_superblock_expected) {
+ CHECK(data_state == TABLET_DATA_TOMBSTONED || data_state == TABLET_DATA_DELETED) << data_state;
+ // There should be no WALs and no cmeta.
+ if (inspect_->CountFilesInWALDirForTS(index, tablet_id) > 0) {
+ return Status::IllegalState("WAL segments exist for tablet", tablet_id);
+ }
+ if (is_cmeta_expected == CMETA_EXPECTED &&
+ !inspect_->DoesConsensusMetaExistForTabletOnTS(index, tablet_id)) {
+ return Status::IllegalState("Expected cmeta for tablet " + tablet_id + " but it doesn't exist");
+ }
+ if (is_superblock_expected == SUPERBLOCK_EXPECTED) {
+ RETURN_NOT_OK(inspect_->CheckTabletDataStateOnTS(index, tablet_id, { data_state }));
+ } else {
+ TabletSuperBlockPB superblock_pb;
+ Status s = inspect_->ReadTabletSuperBlockOnTS(index, tablet_id, &superblock_pb);
+ if (!s.IsNotFound()) {
+ return Status::IllegalState("Found unexpected superblock for tablet " + tablet_id);
+ }
+ }
+ return Status::OK();
+}
+
+Status DeleteTableITest::CheckTabletTombstonedOnTS(int index,
+ const string& tablet_id,
+ IsCMetaExpected is_cmeta_expected) {
+ return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_TOMBSTONED,
+ is_cmeta_expected, SUPERBLOCK_EXPECTED);
+}
+
+Status DeleteTableITest::CheckTabletDeletedOnTS(int index,
+ const string& tablet_id,
+ IsSuperBlockExpected is_superblock_expected) {
+ return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_DELETED,
+ CMETA_NOT_EXPECTED, is_superblock_expected);
+}
+
+void DeleteTableITest::WaitForTabletTombstonedOnTS(int index,
+ const string& tablet_id,
+ IsCMetaExpected is_cmeta_expected) {
+ Status s;
+ for (int i = 0; i < 6000; i++) {
+ s = CheckTabletTombstonedOnTS(index, tablet_id, is_cmeta_expected);
+ if (s.ok()) return;
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ ASSERT_OK(s);
+}
+
+void DeleteTableITest::WaitForTabletDeletedOnTS(int index,
+ const string& tablet_id,
+ IsSuperBlockExpected is_superblock_expected) {
+ Status s;
+ for (int i = 0; i < 6000; i++) {
+ s = CheckTabletDeletedOnTS(index, tablet_id, is_superblock_expected);
+ if (s.ok()) return;
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ ASSERT_OK(s);
+}
+
+void DeleteTableITest::WaitForTSToCrash(int index) {
+ auto ts = cluster_->tablet_server(index);
+ SCOPED_TRACE(ts->instance_id().permanent_uuid());
+ ASSERT_OK(ts->WaitForInjectedCrash(MonoDelta::FromSeconds(60)));
+}
+
+void DeleteTableITest::WaitForAllTSToCrash() {
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ NO_FATALS(WaitForTSToCrash(i));
+ }
+}
+
+void DeleteTableITest::WaitUntilTabletRunning(int index, const std::string& tablet_id) {
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(index)->uuid()],
+ tablet_id, MonoDelta::FromSeconds(60)));
+}
+
+void DeleteTableITest::DeleteTable(const string& table_name,
+ ErrorDumpStackSelector selector) {
+ Status s = client_->DeleteTable(table_name);
+ if (s.IsTimedOut() && (ON_ERROR_DUMP_STACKS == selector)) {
+ WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->master()->pid()),
+ "Couldn't dump stacks");
+ }
+ ASSERT_OK(s);
+}
+
+// Test deleting an empty table, and ensure that the tablets get removed,
+// and the master no longer shows the table as existing.
+TEST_F(DeleteTableITest, TestDeleteEmptyTable) {
+ NO_FATALS(StartCluster());
+ // Create a table on the cluster. We're just using TestWorkload
+ // as a convenient way to create it.
+ TestWorkload(cluster_.get()).Setup();
+
+ // The table should have replicas on all three tservers.
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+
+ // Grab the tablet ID (used later).
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0];
+
+ // Delete it and wait for the replicas to get deleted.
+ // We should have no tablets at the filesystem layer after deleting the table.
+ NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName));
+ ASSERT_OK(inspect_->WaitForNoData());
+
+ // Check that the master no longer exposes the table in any way:
+
+ // 1) Should not list it in ListTables.
+ vector<string> table_names;
+ ASSERT_OK(client_->ListTables(&table_names));
+ ASSERT_TRUE(table_names.empty()) << "table still exposed in ListTables";
+
+ // 2) Should respond to GetTableSchema with a NotFound error.
+ KuduSchema schema;
+ Status s = client_->GetTableSchema(TestWorkload::kDefaultTableName, &schema);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+
+ // 3) Should return an error for GetTabletLocations RPCs.
+ {
+ rpc::RpcController rpc;
+ master::GetTabletLocationsRequestPB req;
+ master::GetTabletLocationsResponsePB resp;
+ rpc.set_timeout(MonoDelta::FromSeconds(10));
+ req.add_tablet_ids()->assign(tablet_id);
+ ASSERT_OK(cluster_->master_proxy()->GetTabletLocations(req, &resp, &rpc));
+ SCOPED_TRACE(SecureDebugString(resp));
+ ASSERT_EQ(1, resp.errors_size());
+ ASSERT_STR_CONTAINS(SecureShortDebugString(resp.errors(0)),
+ "code: NOT_FOUND message: \"Tablet deleted: Table deleted");
+ }
+
+ // 4) The master 'dump-entities' page should not list the deleted table or tablets.
+ EasyCurl c;
+ faststring entities_buf;
+ ASSERT_OK(c.FetchURL(Substitute("http://$0/dump-entities",
+ cluster_->master()->bound_http_hostport().ToString()),
+ &entities_buf));
+ rapidjson::Document doc;
+ doc.Parse<0>(entities_buf.ToString().c_str());
+ ASSERT_EQ(0, doc["tables"].Size());
+ ASSERT_EQ(0, doc["tablets"].Size());
+}
+
+// Test that a DeleteTablet RPC is rejected without a matching destination UUID.
+TEST_F(DeleteTableITest, TestDeleteTabletDestUuidValidation) {
+ NO_FATALS(StartCluster());
+ // Create a table on the cluster. We're just using TestWorkload
+ // as a convenient way to create it.
+ TestWorkload(cluster_.get()).Setup();
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0];
+
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+
+ tserver::DeleteTabletRequestPB req;
+ tserver::DeleteTabletResponsePB resp;
+ rpc::RpcController rpc;
+ rpc.set_timeout(MonoDelta::FromSeconds(20));
+
+ req.set_dest_uuid("fake-uuid");
+ req.set_tablet_id(tablet_id);
+ req.set_delete_type(TABLET_DATA_TOMBSTONED);
+ ASSERT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc));
+ ASSERT_TRUE(resp.has_error());
+ ASSERT_EQ(tserver::TabletServerErrorPB::WRONG_SERVER_UUID, resp.error().code())
+ << SecureShortDebugString(resp);
+ ASSERT_STR_CONTAINS(StatusFromPB(resp.error().status()).ToString(),
+ "Wrong destination UUID");
+}
+
+// Test the atomic CAS argument to DeleteTablet().
+TEST_F(DeleteTableITest, TestAtomicDeleteTablet) {
+ MonoDelta timeout = MonoDelta::FromSeconds(30);
+ NO_FATALS(StartCluster());
+ // Create a table on the cluster. We're just using TestWorkload
+ // as a convenient way to create it.
+ TestWorkload(cluster_.get()).Setup();
+
+ // The table should have replicas on all three tservers.
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+
+ // Grab the tablet ID (used later).
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0];
+
+ const int kTsIndex = 0;
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+
+ // The committed config starts off with an opid_index of -1, so choose something lower.
+ boost::optional<int64_t> opid_index(-2);
+ tserver::TabletServerErrorPB::Code error_code;
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts, tablet_id, timeout));
+
+ Status s;
+ for (int i = 0; i < 100; i++) {
+ s = itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, opid_index, timeout,
+ &error_code);
+ if (error_code == TabletServerErrorPB::CAS_FAILED) break;
+ // If we didn't get the expected CAS_FAILED error, it's OK to get 'TABLET_NOT_RUNNING'
+ // because the "creating" maintenance state persists just slightly after it starts to
+ // expose 'RUNNING' state in ListTablets()
+ ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_RUNNING, error_code)
+ << "unexpected error: " << s.ToString();
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+
+ ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code) << "unexpected error: " << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "of -2 but the committed config has opid_index of -1");
+
+ // Now use the "latest", which is -1.
+ opid_index = -1;
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, opid_index, timeout,
+ &error_code));
+ inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED });
+
+ // Now that the tablet is already tombstoned, our opid_index should be
+ // ignored (because it's impossible to check it).
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, -9999, timeout,
+ &error_code));
+ inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED });
+
+ // Same with TOMBSTONED -> DELETED.
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_DELETED, -9999, timeout,
+ &error_code));
+ inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_DELETED });
+}
+
+TEST_F(DeleteTableITest, TestDeleteTableWithConcurrentWrites) {
+ NO_FATALS(StartCluster());
+ int n_iters = AllowSlowTests() ? 20 : 1;
+ for (int i = 0; i < n_iters; i++) {
+ LOG(INFO) << "Running iteration " << i;
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(Substitute("table-$0", i));
+
+ // We'll delete the table underneath the writers, so we expcted
+ // a NotFound error during the writes.
+ workload.set_not_found_allowed(true);
+ workload.Setup();
+
+ // Start the workload, and wait to see some rows actually inserted
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // Delete it and wait for the replicas to get deleted.
+ NO_FATALS(DeleteTable(workload.table_name()));
+ ASSERT_OK(inspect_->WaitForNoData());
+
+ // Sleep just a little longer to make sure client threads send
+ // requests to the missing tablets.
+ SleepFor(MonoDelta::FromMilliseconds(50));
+
+ workload.StopAndJoin();
+ NO_FATALS(cluster_->AssertNoCrashes());
+ }
+}
+
+// Test that a tablet replica is automatically tombstoned on startup if a local
+// crash occurs in the middle of Tablet Copy. Additionally acts as a regression
+// test for KUDU-1605.
+TEST_F(DeleteTableITest, TestAutoTombstoneAfterCrashDuringTabletCopy) {
+ // Set up flags to flush frequently, so that we get data on disk.
+ vector<string> ts_flags = {
+ "--flush_threshold_mb=0",
+ "--maintenance_manager_polling_interval_ms=100"
+ };
+ vector<string> master_flags = {
+ "--allow_unsafe_replication_factor=true"
+ };
+ NO_FATALS(StartCluster(ts_flags, master_flags));
+ const MonoDelta timeout = MonoDelta::FromSeconds(10);
+ const int kTsIndex = 0; // We'll test with the first TS.
+
+ // Shut down TS 1, 2, write some data to TS 0 alone.
+ cluster_->tablet_server(1)->Shutdown();
+ cluster_->tablet_server(2)->Shutdown();
+ cluster_->master()->Shutdown();
+ ASSERT_OK(cluster_->master()->Restart());
+ ASSERT_OK(cluster_->WaitForTabletServerCount(1, MonoDelta::FromSeconds(30)));
+
+ // Set up a table which has a tablet only on TS 0. This will be used to test for
+ // "collateral damage" bugs where incorrect handling of the main test tablet
+ // accidentally removes blocks from another tablet.
+ // We use a sequential workload so that we just flush and don't compact.
+ // Thus we use a contiguous set of block IDs starting with 1, with no "holes"
+ // from deleted blocks.
+ string unaffected_tablet_id;
+ TestWorkload unrepl_workload(cluster_.get());
+ {
+ unrepl_workload.set_num_replicas(1);
+ unrepl_workload.set_table_name("other-table");
+ unrepl_workload.set_num_write_threads(1);
+ unrepl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ unrepl_workload.Setup();
+
+ // Figure out the tablet ID.
+ vector<string> tablets = inspect_->ListTabletsOnTS(kTsIndex);
+ ASSERT_EQ(1, tablets.size());
+ unaffected_tablet_id = tablets[0];
+
+ // Write rows until it has flushed a few times.
+ unrepl_workload.Start();
+ TabletSuperBlockPB sb;
+ while (sb.rowsets().size() < 3) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kTsIndex, unaffected_tablet_id, &sb));
+ }
+ unrepl_workload.StopAndJoin();
+ }
+
+ // Restart the other two servers.
+ ASSERT_OK(cluster_->tablet_server(1)->Restart());
+ ASSERT_OK(cluster_->tablet_server(2)->Restart());
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+
+ // Restart the master to be sure that it only sees the live servers.
+ // Otherwise it may try to create a tablet with a replica on the down server.
+ // The table creation would eventually succeed after picking a different set of
+ // replicas, but not before causing a timeout.
+ cluster_->master()->Shutdown();
+ ASSERT_OK(cluster_->master()->Restart());
+ ASSERT_OK(cluster_->WaitForTabletServerCount(2, MonoDelta::FromSeconds(30)));
+
+ // Create a new table with a single tablet replicated on the other two servers.
+ // We use the same sequential workload. This produces block ID sequences
+ // that look like:
+ // TS 0: |---- blocks from 'other-table' ---]
+ // TS 1: |---- blocks from default workload table ---]
+ // TS 2: |---- blocks from default workload table ---]
+ // So, if we were to accidentally reuse or delete block IDs from TS 1 or TS 2 when
+ // copying a tablet to TS 0, we'd surely notice it!
+ string replicated_tablet_id;
+
+ TestWorkload repl_workload(cluster_.get());
+ {
+ repl_workload.set_num_replicas(2);
+ repl_workload.set_num_write_threads(1);
+ repl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
+ repl_workload.Setup();
+ ASSERT_OK(inspect_->WaitForReplicaCount(3)); // 1 original plus 2 new replicas.
+
+ // Figure out the tablet id of the new table.
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ replicated_tablet_id = tablets[0];
+
+ repl_workload.Start();
+ TabletSuperBlockPB sb;
+ while (sb.rowsets().size() < 3) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(1, replicated_tablet_id, &sb));
+ }
+ repl_workload.StopAndJoin();
+ }
+
+ // Enable a fault crash when Tablet Copy occurs on TS 0.
+ cluster_->tablet_server(kTsIndex)->mutable_flags()->push_back(
+ "--fault_crash_after_tc_files_fetched=1.0");
+
+ // Restart TS 0 and add it to the config. It will crash when tablet copy starts.
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ string leader_uuid = GetLeaderUUID(cluster_->tablet_server(1)->uuid(), replicated_tablet_id);
+ TServerDetails* leader = DCHECK_NOTNULL(ts_map_[leader_uuid]);
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+ ASSERT_OK(itest::AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER,
+ boost::none, timeout));
+ NO_FATALS(WaitForTSToCrash(kTsIndex));
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+
+ LOG(INFO) << "Test progress: crashed on first attempt to copy";
+
+ // The superblock should be in TABLET_DATA_COPYING state on disk.
+ NO_FATALS(inspect_->CheckTabletDataStateOnTS(kTsIndex, replicated_tablet_id,
+ { TABLET_DATA_COPYING }));
+
+ // Restart and let it crash one more time.
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ NO_FATALS(WaitForTSToCrash(kTsIndex));
+ LOG(INFO) << "Test progress: crashed on second attempt to copy";
+
+ // Remove the fault flag and let it successfully copy the tablet.
+ cluster_->tablet_server(kTsIndex)->mutable_flags()->pop_back();
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+
+ // Everything should be consistent now.
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST,
+ repl_workload.rows_inserted()));
+ NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST,
+ unrepl_workload.rows_inserted()));
+
+ // Now we want to test the case where crashed while copying over a previously-tombstoned tablet.
+ // So, we first remove the server, causing it to get tombstoned.
+ ASSERT_OK(itest::RemoveServer(leader, replicated_tablet_id, ts, boost::none, timeout));
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, replicated_tablet_id, CMETA_EXPECTED));
+
+ // ... and then add it back, with the fault runtime-enabled.
+ ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex),
+ "fault_crash_after_tc_files_fetched", "1.0"));
+ ASSERT_OK(itest::AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER,
+ boost::none, timeout));
+ NO_FATALS(WaitForTSToCrash(kTsIndex));
+ LOG(INFO) << "Test progress: crashed on attempt to copy over tombstoned";
+
+ // Finally, crashing with no fault flags should fully recover again.
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST,
+ repl_workload.rows_inserted()));
+ NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST,
+ unrepl_workload.rows_inserted()));
+}
+
+// Test that a tablet replica automatically tombstones itself if the remote
+// server fails in the middle of the Tablet Copy process.
+// Also test that we can Copy Tablet over a tombstoned tablet.
+TEST_F(DeleteTableITest, TestAutoTombstoneAfterTabletCopyRemoteFails) {
+ vector<string> ts_flags = {
+ "--enable_leader_failure_detection=false", // Make test deterministic.
+ "--log_segment_size_mb=1", // Faster log rolls.
+ "--log_compression_codec=NO_COMPRESSION" // Faster log rolls.
+ };
+ vector<string> master_flags = {
+ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+ "--allow_unsafe_replication_factor=true"
+ };
+ NO_FATALS(StartCluster(ts_flags, master_flags));
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(20);
+ const int kTsIndex = 0; // We'll test with the first TS.
+
+ // We'll do a config change to Tablet Copy a replica here later. For
+ // now, shut down TS-0.
+ LOG(INFO) << "Shutting down TS " << cluster_->tablet_server(kTsIndex)->uuid();
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+
+ // Bounce the Master so it gets new tablet reports and doesn't try to assign
+ // a replica to the dead TS.
+ cluster_->master()->Shutdown();
+ ASSERT_OK(cluster_->master()->Restart());
+ cluster_->WaitForTabletServerCount(2, kTimeout);
+
+ // Start a workload on the cluster, and run it for a little while.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(2);
+ workload.Setup();
+ ASSERT_OK(inspect_->WaitForReplicaCount(2));
+
+ // Figure out the tablet id.
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0];
+
+ for (int i = 1; i <= 2; i++) {
+ NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
+ }
+
+ // Elect a leader and run some data through the cluster.
+ const int kLeaderIndex = 1;
+ string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
+ ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout));
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // Tablet Copy doesn't see the active WAL segment, and we need to
+ // download a file to trigger the fault in this test. Due to the log index
+ // chunks, that means 3 files minimum: One in-flight WAL segment, one index
+ // chunk file (these files grow much more slowly than the WAL segments), and
+ // one completed WAL segment.
+ ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kLeaderIndex, tablet_id, 3));
+ workload.StopAndJoin();
+
+ // Cause the leader to crash when a follower tries to initiate Tablet Copy from it.
+ const string& fault_flag = "fault_crash_on_handle_tc_fetch_data";
+ ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kLeaderIndex), fault_flag, "1.0"));
+
+ // Add TS-0 as a new member to the config and wait for the leader to crash.
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ TServerDetails* leader = ts_map_[kLeaderUuid];
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+ // The server may crash before responding to our RPC.
+ Status s = itest::AddServer(leader, tablet_id, ts, RaftPeerPB::VOTER, boost::none, kTimeout);
+ ASSERT_TRUE(s.ok() || s.IsNetworkError()) << s.ToString();
+ NO_FATALS(WaitForTSToCrash(kLeaderIndex));
+
+ // The tablet server will detect that the leader failed, and automatically
+ // tombstone its replica. Shut down the other non-leader replica to avoid
+ // interference while we wait for this to happen.
+ cluster_->tablet_server(1)->Shutdown();
+ cluster_->tablet_server(2)->Shutdown();
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED));
+
+ // Check the textual status message for the failed copy.
+ {
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> status_pbs;
+ ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, kTimeout, &status_pbs));
+ ASSERT_STR_MATCHES(status_pbs[0].tablet_status().last_status(),
+ "Tablet Copy: Tombstoned tablet .*: Tablet copy aborted");
+ }
+
+ // Now bring the other replicas back, re-elect the previous leader (TS-1),
+ // and wait for the leader to Tablet Copy the tombstoned replica. This
+ // will have replaced a tablet with no consensus metadata.
+ ASSERT_OK(cluster_->tablet_server(1)->Restart());
+ ASSERT_OK(cluster_->tablet_server(2)->Restart());
+ for (int i = 1; i <= 2; i++) {
+ NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
+ }
+ ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout));
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
+
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+ workload.rows_inserted()));
+
+ // Now pause the other replicas and tombstone our replica again.
+ ASSERT_OK(cluster_->tablet_server(1)->Pause());
+ ASSERT_OK(cluster_->tablet_server(2)->Pause());
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED));
+
+ // Bring them back again, let them yet again Copy a new replica on top of our tombstoned replica.
+ // This time, the leader will have replaced a tablet with consensus metadata.
+ ASSERT_OK(cluster_->tablet_server(1)->Resume());
+ ASSERT_OK(cluster_->tablet_server(2)->Resume());
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
+
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
+ workload.rows_inserted()));
+}
+
+// Test for correct Tablet Copy merge of consensus metadata.
+TEST_F(DeleteTableITest, TestMergeConsensusMetadata) {
+ // Enable manual leader selection.
+ vector<string> ts_flags, master_flags;
+ ts_flags.push_back("--enable_leader_failure_detection=false");
+ master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+ NO_FATALS(StartCluster(ts_flags, master_flags));
+ const MonoDelta timeout = MonoDelta::FromSeconds(10);
+ const int kTsIndex = 0;
+
+ TestWorkload workload(cluster_.get());
+ workload.Setup();
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+
+ // Figure out the tablet id to Tablet Copy.
+ vector<string> tablets = inspect_->ListTabletsOnTS(1);
+ ASSERT_EQ(1, tablets.size());
+ const string& tablet_id = tablets[0];
+
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
+ }
+
+ // Elect a leader and run some data through the cluster.
+ int leader_index = 1;
+ string leader_uuid = cluster_->tablet_server(leader_index)->uuid();
+ ASSERT_OK(itest::StartElection(ts_map_[leader_uuid], tablet_id, timeout));
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
+
+ // Verify that TS 0 voted for the chosen leader.
+ ConsensusMetadataPB cmeta_pb;
+ ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
+ ASSERT_EQ(1, cmeta_pb.current_term());
+ ASSERT_EQ(leader_uuid, cmeta_pb.voted_for());
+
+ // Shut down all but TS 0 and try to elect TS 0. The election will fail but
+ // the TS will record a vote for itself as well as a new term (term 2).
+ cluster_->tablet_server(1)->Shutdown();
+ cluster_->tablet_server(2)->Shutdown();
+ NO_FATALS(WaitUntilTabletRunning(kTsIndex, tablet_id));
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+ ASSERT_OK(itest::StartElection(ts, tablet_id, timeout));
+ for (int i = 0; i < 6000; i++) {
+ Status s = inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb);
+ if (s.ok() &&
+ cmeta_pb.current_term() == 2 &&
+ cmeta_pb.voted_for() == ts->uuid()) {
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ ASSERT_EQ(2, cmeta_pb.current_term());
+ ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for());
+
+ // Tombstone our special little guy, then shut him down.
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+
+ // Restart the other dudes and re-elect the same leader.
+ ASSERT_OK(cluster_->tablet_server(1)->Restart());
+ ASSERT_OK(cluster_->tablet_server(2)->Restart());
+ TServerDetails* leader = ts_map_[leader_uuid];
+ NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
+ NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
+ ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+ ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout));
+
+ // Bring our special little guy back up.
+ // Wait until he gets Tablet Copied.
+ LOG(INFO) << "Bringing TS " << cluster_->tablet_server(kTsIndex)->uuid()
+ << " back up...";
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
+
+ // Assert that the election history is retained (voted for self).
+ ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
+ ASSERT_EQ(2, cmeta_pb.current_term());
+ ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for());
+
+ // Now do the same thing as above, where we tombstone TS 0 then trigger a new
+ // term (term 3) on the other machines. TS 0 will get copied
+ // again, but this time the vote record on TS 0 for term 2 should not be
+ // retained after Tablet Copy occurs.
+ cluster_->tablet_server(1)->Shutdown();
+ cluster_->tablet_server(2)->Shutdown();
+
+ // Delete with retries because the tablet might still be bootstrapping.
+ NO_FATALS(itest::DeleteTabletWithRetries(ts, tablet_id,
+ TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
+
+ ASSERT_OK(cluster_->tablet_server(1)->Restart());
+ ASSERT_OK(cluster_->tablet_server(2)->Restart());
+ NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
+ NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
+ ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
+
+ // The election history should have been wiped out.
+ ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
+ ASSERT_EQ(3, cmeta_pb.current_term());
+ ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb);
+}
+
+// Regression test for KUDU-987, a bug where followers with transactions in
+// REPLICATING state, which means they have not yet been committed to a
+// majority, cannot shut down during a DeleteTablet() call.
+TEST_F(DeleteTableITest, TestDeleteFollowerWithReplicatingTransaction) {
+ if (!AllowSlowTests()) {
+ // We will typically wait at least 5 seconds for timeouts to occur.
+ LOG(INFO) << "Skipping test in fast-test mode.";
+ return;
+ }
+
+ const MonoDelta timeout = MonoDelta::FromSeconds(10);
+
+ const int kNumTabletServers = 5;
+ vector<string> ts_flags, master_flags;
+ ts_flags.push_back("--enable_leader_failure_detection=false");
+ ts_flags.push_back("--flush_threshold_mb=0"); // Always be flushing.
+ ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
+ master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+ NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
+
+ const int kTsIndex = 0; // We'll test with the first TS.
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
+
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(kNumTabletServers);
+ workload.Setup();
+
+ // Figure out the tablet ids of the created tablets.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Wait until all replicas are up and running.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id, timeout));
+ }
+
+ // Elect TS 1 as leader.
+ const int kLeaderIndex = 1;
+ const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
+ TServerDetails* leader = ts_map_[kLeaderUuid];
+ ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+ ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+
+ // Kill a majority, but leave the leader and a single follower.
+ LOG(INFO) << "Killing majority";
+ for (int i = 2; i < kNumTabletServers; i++) {
+ cluster_->tablet_server(i)->Shutdown();
+ }
+
+ // Now write a single row to the leader.
+ // We give 5 seconds for the timeout to pretty much guarantee that a flush
+ // will occur due to the low flush threshold we set.
+ LOG(INFO) << "Writing a row";
+ Status s = WriteSimpleTestRow(leader, tablet_id, RowOperationsPB::INSERT,
+ 1, 1, "hola, world", MonoDelta::FromSeconds(5));
+ ASSERT_TRUE(s.IsTimedOut());
+ ASSERT_STR_CONTAINS(s.ToString(), "timed out");
+
+ LOG(INFO) << "Killing the leader...";
+ cluster_->tablet_server(kLeaderIndex)->Shutdown();
+
+ // Now tombstone the follower tablet. This should succeed even though there
+ // are uncommitted operations on the replica.
+ LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << ts->uuid();
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+}
+
+// Test that orphaned blocks are cleared from the superblock when a tablet is
+// tombstoned.
+TEST_F(DeleteTableITest, TestOrphanedBlocksClearedOnDelete) {
+ const MonoDelta timeout = MonoDelta::FromSeconds(30);
+ vector<string> ts_flags, master_flags;
+ ts_flags.push_back("--enable_leader_failure_detection=false");
+ ts_flags.push_back("--flush_threshold_mb=0"); // Flush quickly since we wait for a flush to occur.
+ ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
+ master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
+ NO_FATALS(StartCluster(ts_flags, master_flags));
+
+ const int kFollowerIndex = 0;
+ TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()];
+
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.Setup();
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets));
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Wait until all replicas are up and running.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id, timeout));
+ }
+
+ // Elect TS 1 as leader.
+ const int kLeaderIndex = 1;
+ const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
+ TServerDetails* leader_ts = ts_map_[kLeaderUuid];
+ ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
+ ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
+
+ // Run a write workload and wait until we see some rowsets flush on the follower.
+ workload.Start();
+ TabletSuperBlockPB superblock_pb;
+ for (int i = 0; i < 3000; i++) {
+ ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb));
+ if (!superblock_pb.rowsets().empty()) break;
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ ASSERT_GT(superblock_pb.rowsets_size(), 0)
+ << "Timed out waiting for rowset flush on TS " << follower_ts->uuid() << ": "
+ << "Superblock:\n" << SecureDebugString(superblock_pb);
+
+ // Shut down the leader so it doesn't try to copy a new replica to our follower later.
+ workload.StopAndJoin();
+ cluster_->tablet_server(kLeaderIndex)->Shutdown();
+
+ // Tombstone the follower and check that there are no rowsets or orphaned
+ // blocks retained in the superblock.
+ ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED,
+ boost::none, timeout));
+ NO_FATALS(WaitForTabletTombstonedOnTS(kFollowerIndex, tablet_id, CMETA_EXPECTED));
+ ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb));
+ ASSERT_EQ(0, superblock_pb.rowsets_size()) << SecureDebugString(superblock_pb);
+ ASSERT_EQ(0, superblock_pb.orphaned_blocks_size()) << SecureDebugString(superblock_pb);
+}
+
+vector<const string*> Grep(const string& needle, const vector<string>& haystack) {
+ vector<const string*> results;
+ for (const string& s : haystack) {
+ if (s.find(needle) != string::npos) {
+ results.push_back(&s);
+ }
+ }
+ return results;
+}
+
+vector<string> ListOpenFiles(pid_t pid) {
+ string cmd = strings::Substitute("export PATH=$$PATH:/usr/bin:/usr/sbin; lsof -n -p $0", pid);
+ vector<string> argv = { "bash", "-c", cmd };
+ string out;
+ CHECK_OK(Subprocess::Call(argv, "", &out));
+ vector<string> lines = strings::Split(out, "\n");
+ return lines;
+}
+
+int PrintOpenTabletFiles(pid_t pid, const string& tablet_id) {
+ vector<string> lines = ListOpenFiles(pid);
+ vector<const string*> wal_lines = Grep(tablet_id, lines);
+ LOG(INFO) << "There are " << wal_lines.size() << " open WAL files for pid " << pid << ":";
+ for (const string* l : wal_lines) {
+ LOG(INFO) << *l;
+ }
+ return wal_lines.size();
+}
+
+// Regression test for tablet deletion FD leak. See KUDU-1288.
+TEST_F(DeleteTableITest, TestFDsNotLeakedOnTabletTombstone) {
+ const MonoDelta timeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster({}, {}, 1));
+
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(1);
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 1000) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Tombstone the tablet and then ensure that lsof does not list any
+ // tablet-related paths.
+ ExternalTabletServer* ets = cluster_->tablet_server(0);
+ ASSERT_OK(itest::DeleteTablet(ts_map_[ets->uuid()],
+ tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id));
+
+ // Restart the TS after deletion and then do the same lsof check again.
+ ets->Shutdown();
+ ASSERT_OK(ets->Restart());
+ ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id));
+}
+
+// Regression test for KUDU-1545: crash when visiting the tablet page for a
+// tombstoned tablet.
+TEST_F(DeleteTableITest, TestWebPageForTombstonedTablet) {
+ const MonoDelta timeout = MonoDelta::FromSeconds(30);
+
+ NO_FATALS(StartCluster({}, {}, 1));
+
+ // Create the table.
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(1);
+ workload.Setup();
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Tombstone the tablet.
+ ExternalTabletServer* ets = cluster_->tablet_server(0);
+ AssertEventually([&]() {
+ ASSERT_OK(itest::DeleteTablet(ts_map_[ets->uuid()],
+ tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ });
+
+ // Check the various web pages associated with the tablet, ensuring
+ // they don't crash and at least have the tablet ID within them.
+ EasyCurl c;
+ const auto& pages = { "tablet",
+ "tablet-rowsetlayout-svg",
+ "tablet-consensus-status",
+ "log-anchors" };
+ for (const auto& page : pages) {
+ faststring buf;
+ ASSERT_OK(c.FetchURL(Substitute(
+ "http://$0/$1?id=$2",
+ cluster_->tablet_server(0)->bound_http_hostport().ToString(),
+ page,
+ tablet_id), &buf));
+ ASSERT_STR_CONTAINS(buf.ToString(), tablet_id);
+ }
+}
+
+
+TEST_F(DeleteTableITest, TestUnknownTabletsAreNotDeleted) {
+ // Speed up heartbeating so that the unknown tablet is detected faster.
+ vector<string> extra_ts_flags = { "--heartbeat_interval_ms=10" };
+
+ NO_FATALS(StartCluster(extra_ts_flags, {}, 1));
+
+ Schema schema(GetSimpleTestSchema());
+ client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema));
+ unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
+ ASSERT_OK(creator->table_name("test")
+ .schema(&client_schema)
+ .set_range_partition_columns({"key"})
+ .num_replicas(1)
+ .Create());
+
+ // Figure out the tablet id of the created tablet.
+ const MonoDelta timeout = MonoDelta::FromSeconds(30);
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
+ const string& tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Delete the master's metadata and start it back up. The tablet created
+ // above is now unknown, but should not be deleted!
+ cluster_->master()->Shutdown();
+ ASSERT_OK(env_->DeleteRecursively(cluster_->master()->data_dir()));
+ ASSERT_OK(cluster_->master()->Restart());
+
+ // Give the master a chance to finish writing the new master tablet to disk
+ // so that it can be found after the subsequent restart below.
+ ASSERT_OK(cluster_->master()->WaitForCatalogManager());
+
+ int64_t num_delete_attempts;
+ AssertEventually([&]() {
+ int64_t num_heartbeats;
+ ASSERT_OK(cluster_->master()->GetInt64Metric(
+ &METRIC_ENTITY_server, "kudu.master",
+ &METRIC_handler_latency_kudu_master_MasterService_TSHeartbeat, "total_count",
+ &num_heartbeats));
+ ASSERT_GE(num_heartbeats, 1);
+ });
+ ASSERT_OK(cluster_->tablet_server(0)->GetInt64Metric(
+ &METRIC_ENTITY_server, "kudu.tabletserver",
+ &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
+ "total_count", &num_delete_attempts));
+ ASSERT_EQ(0, num_delete_attempts);
+
+ // Now restart the master with orphan deletion enabled. The tablet should get
+ // deleted.
+ // We also need to restart the tablet server, or else the old tablet server
+ // won't be able to authenticate to the new master, due to it having a new
+ // CA certificate which the old tserver doesn't trust.
+ //
+ // TODO(KUDU-65): perhaps this is actually a feature? should we have tablet servers
+ // remember the CA cert persistently so that it's impossible to connect an
+ // old tserver to a new cluster?
+ cluster_->Shutdown();
+ cluster_->master()->mutable_flags()->push_back(
+ "--catalog_manager_delete_orphaned_tablets");
+ ASSERT_OK(cluster_->Restart());
+ AssertEventually([&]() {
+ ASSERT_OK(cluster_->tablet_server(0)->GetInt64Metric(
+ &METRIC_ENTITY_server, "kudu.tabletserver",
+ &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
+ "total_count", &num_delete_attempts));
+ // Sometimes the tablet server has time to report the orphaned tablet multiple times
+ // before the delete succeeds. This is ok because tablet deletion is idempotent.
+ ASSERT_GE(num_delete_attempts, 1);
+ ASSERT_OK(CheckTabletDeletedOnTS(0, tablet_id, SUPERBLOCK_NOT_EXPECTED));
+ });
+
+}
+
+// Parameterized test case for TABLET_DATA_DELETED deletions.
+class DeleteTableDeletedParamTest : public DeleteTableITest,
+ public ::testing::WithParamInterface<const char*> {
+};
+
+// Test that if a server crashes mid-delete that the delete will be rolled
+// forward on startup. Parameterized by different fault flags that cause a
+// crash at various points.
+TEST_P(DeleteTableDeletedParamTest, TestRollForwardDelete) {
+ NO_FATALS(StartCluster());
+ const string fault_flag = GetParam();
+ LOG(INFO) << "Running with fault flag: " << fault_flag;
+
+ // Dynamically set the fault flag so they crash when DeleteTablet() is called
+ // by the Master.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), fault_flag, "1.0"));
+ }
+
+ // Create a table on the cluster. We're just using TestWorkload
+ // as a convenient way to create it.
+ TestWorkload(cluster_.get()).Setup();
+
+ // The table should have replicas on all three tservers.
+ ASSERT_OK(inspect_->WaitForReplicaCount(3));
+
+ // Delete it and wait for the tablet servers to crash.
+ NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName));
+ NO_FATALS(WaitForAllTSToCrash());
+
+ // There should still be data left on disk.
+ Status s = inspect_->CheckNoData();
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+
+ // Now restart the tablet servers. They should roll forward their deletes.
+ // We don't have to reset the fault flag here because it was set dynamically.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ cluster_->tablet_server(i)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(i)->Restart());
+ }
+ ASSERT_OK(inspect_->WaitForNoData());
+}
+
+// Faults appropriate for the TABLET_DATA_DELETED case.
+const char* deleted_faults[] = {"fault_crash_after_blocks_deleted",
+ "fault_crash_after_wal_deleted",
+ "fault_crash_after_cmeta_deleted"};
+
+INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableDeletedParamTest,
+ ::testing::ValuesIn(deleted_faults));
+
+// Parameterized test case for TABLET_DATA_TOMBSTONED deletions.
+class DeleteTableTombstonedParamTest : public DeleteTableITest,
+ public ::testing::WithParamInterface<const char*> {
+};
+
+// Regression test for tablet tombstoning. Tests:
+// 1. basic creation & tombstoning of a tablet.
+// 2. roll-forward (crash recovery) of a partially-completed tombstoning of a tablet.
+// 3. permanent deletion of a TOMBSTONED tablet
+// (transition from TABLET_DATA_TOMBSTONED to TABLET_DATA_DELETED).
+TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
+ vector<string> flags;
+ // We want fast log rolls and deterministic preallocation, since we wait for
+ // a certain number of logs at the beginning of the test.
+ flags.push_back("--log_segment_size_mb=1");
+ flags.push_back("--log_async_preallocate_segments=false");
+ flags.push_back("--log_min_segments_to_retain=3");
+ flags.push_back("--log_compression_codec=NO_COMPRESSION");
+ NO_FATALS(StartCluster(flags));
+ const string fault_flag = GetParam();
+ LOG(INFO) << "Running with fault flag: " << fault_flag;
+
+ MonoDelta timeout = MonoDelta::FromSeconds(30);
+
+ // Create a table with 2 tablets. We delete the first tablet without
+ // injecting any faults, then we delete the second tablet while exercising
+ // several fault injection points.
+ const int kNumTablets = 2;
+ vector<const KuduPartialRow*> split_rows;
+ Schema schema(GetSimpleTestSchema());
+ client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema));
+ KuduPartialRow* split_row = client_schema.NewRow();
+ ASSERT_OK(split_row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets));
+ split_rows.push_back(split_row);
+ gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName)
+ .split_rows(split_rows)
+ .schema(&client_schema)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(3)
+ .Create());
+
+ // Start a workload on the cluster, and run it until we find WALs on disk.
+ TestWorkload workload(cluster_.get());
+ workload.set_payload_bytes(32 * 1024); // Write ops of size 32KB to quickly fill the logs.
+ workload.set_write_batch_size(1);
+ workload.Setup();
+
+ // The table should have 2 tablets (1 split) on all 3 tservers (for a total of 6).
+ ASSERT_OK(inspect_->WaitForReplicaCount(6));
+
+ // Set up the proxies so we can easily send DeleteTablet() RPCs.
+ TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
+
+ // Ensure the tablet server is reporting 2 tablets.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
+
+ LOG(INFO) << "Starting workload...";
+
+ // Run the workload against whoever the leader is until WALs appear on TS 0
+ // for the tablets we created.
+ const int kTsIndex = 0; // Index of the tablet server we'll use for the test.
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ LOG(INFO) << "Waiting for 3 wal files for tablet "
+ << tablets[0].tablet_status().tablet_id() << "...";
+ ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex,
+ tablets[0].tablet_status().tablet_id(), 3));
+
+ LOG(INFO) << "Waiting for 3 wal files for tablet "
+ << tablets[1].tablet_status().tablet_id() << "...";
+ ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex,
+ tablets[1].tablet_status().tablet_id(), 3));
+
+ LOG(INFO) << "Stopping workload...";
+ workload.StopAndJoin();
+
+ // Shut down the master and the other tablet servers so they don't interfere
+ // by attempting to create or copy tablets while we delete tablets.
+ cluster_->master()->Shutdown();
+ cluster_->tablet_server(1)->Shutdown();
+ cluster_->tablet_server(2)->Shutdown();
+
+ // Tombstone the first tablet.
+ string tablet_id = tablets[0].tablet_status().tablet_id();
+ LOG(INFO) << "Tombstoning first tablet " << tablet_id << "...";
+ ASSERT_TRUE(inspect_->DoesConsensusMetaExistForTabletOnTS(kTsIndex, tablet_id)) << tablet_id;
+ ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ LOG(INFO) << "Waiting for first tablet to be tombstoned...";
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
+
+ ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
+ for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
+ if (t.tablet_status().tablet_id() == tablet_id) {
+ ASSERT_EQ(tablet::SHUTDOWN, t.tablet_status().state());
+ ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
+ << t.tablet_status().tablet_id() << " not tombstoned";
+ }
+ }
+
+ // Now tombstone the 2nd tablet, causing a fault.
+ ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex), fault_flag, "1.0"));
+ tablet_id = tablets[1].tablet_status().tablet_id();
+ LOG(INFO) << "Tombstoning second tablet " << tablet_id << "...";
+ ignore_result(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
+ NO_FATALS(WaitForTSToCrash(kTsIndex));
+
+ // Restart the tablet server and wait for the WALs to be deleted and for the
+ // superblock to show that it is tombstoned.
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ LOG(INFO) << "Waiting for second tablet to be tombstoned...";
+ NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
+ // The tombstoned tablets will still show up in ListTablets(),
+ // just with their data state set as TOMBSTONED. They should also be listed
+ // as NOT_STARTED because we restarted the server.
+ ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
+ for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
+ ASSERT_EQ(tablet::NOT_STARTED, t.tablet_status().state());
+ ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
+ << t.tablet_status().tablet_id() << " not tombstoned";
+ }
+
+ // Check that, upon restart of the tablet server with a tombstoned tablet,
+ // we don't unnecessary "roll forward" and rewrite the tablet metadata file
+ // when it is already fully deleted.
+ int64_t orig_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id);
+ cluster_->tablet_server(kTsIndex)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
+ int64_t new_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id);
+ ASSERT_EQ(orig_mtime, new_mtime)
+ << "Tablet superblock should not have been re-flushed unnecessarily";
+
+ // Finally, delete all tablets on the TS, and wait for all data to be gone.
+ LOG(INFO) << "Deleting all tablets...";
+ for (const ListTabletsResponsePB::StatusAndSchemaPB& tablet : tablets) {
+ string tablet_id = tablet.tablet_status().tablet_id();
+ // We need retries here, since some of the tablets may still be
+ // bootstrapping after being restarted above.
+ NO_FATALS(itest::DeleteTabletWithRetries(ts, tablet_id,
+ TABLET_DATA_DELETED, boost::none, timeout));
+ }
+ ASSERT_OK(inspect_->WaitForNoDataOnTS(kTsIndex));
+}
+
+// Faults appropriate for the TABLET_DATA_TOMBSTONED case.
+// Tombstoning a tablet does not delete the consensus metadata.
+const char* tombstoned_faults[] = {"fault_crash_after_blocks_deleted",
+ "fault_crash_after_wal_deleted"};
+INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableTombstonedParamTest,
+ ::testing::ValuesIn(tombstoned_faults));
+
+
+class DeleteTableWhileScanInProgressParamTest :
+ public DeleteTableITest,
+ public ::testing::WithParamInterface<
+ std::tuple<KuduScanner::ReadMode, KuduClient::ReplicaSelection>> {
+};
+
+// Make sure the tablet server keeps the necessary data to serve scan request in
+// progress if tablet is marked for deletion.
+TEST_P(DeleteTableWhileScanInProgressParamTest, Test) {
+ const auto read_mode_to_string = [](KuduScanner::ReadMode mode) {
+ switch (mode) {
+ case KuduScanner::READ_LATEST:
+ return "READ_LATEST";
+ case KuduScanner::READ_AT_SNAPSHOT:
+ return "READ_AT_SNAPSHOT";
+ default:
+ return "UNKNOWN";
+ }
+ };
+ const auto replica_sel_to_string = [](KuduClient::ReplicaSelection sel) {
+ switch (sel) {
+ case KuduClient::LEADER_ONLY:
+ return "LEADER_ONLY";
+ case KuduClient::CLOSEST_REPLICA:
+ return "CLOSEST_REPLICA";
+ case KuduClient::FIRST_REPLICA:
+ return "FIRST_REPLICA";
+ default:
+ return "UNKNOWN";
+ }
+ };
+
+ const std::vector<std::string> extra_ts_flags = {
+ // Set the flush threshold low so that we have a mix of flushed and
+ // unflushed operations in the WAL, when we bootstrap.
+ "--flush_threshold_mb=1",
+
+ // Set the compaction budget to be low so that we get multiple passes
+ // of compaction instead of selecting all of the rowsets in a single
+ // compaction of the whole tablet.
+ "--tablet_compaction_budget_mb=1",
+
+ // Set the major delta compaction ratio low enough that we trigger
+ // a lot of them.
+ "--tablet_delta_store_major_compact_min_ratio=0.001",
+ };
+
+ // Approximate number of rows to insert. This is not exact number due to the
+ // way the test controls the progress of the test workload.
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+ // Test is too slow in ASAN/TSAN.
+ const size_t rows_to_insert = 10000;
+#else
+ const size_t rows_to_insert = AllowSlowTests() ? 100000 : 10000;
+#endif
+
+ const auto& param = GetParam();
+ const auto mode = std::get<0>(param);
+ const auto sel = std::get<1>(param);
+
+ // In case of failure, print out string representation of the parameterized
+ // case for ease of troubleshooting.
+ SCOPED_TRACE(Substitute("read mode $0; replica selection mode $1",
+ read_mode_to_string(mode),
+ replica_sel_to_string(sel)));
+ NO_FATALS(StartCluster(extra_ts_flags));
+
+ TestWorkload w(cluster_.get());
+ w.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS);
+ w.Setup();
+
+ // Start the workload, and wait to see some rows actually inserted.
+ w.Start();
+ while (w.rows_inserted() < rows_to_insert) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ w.StopAndJoin();
+ const int64_t ref_row_count = w.rows_inserted();
+
+ using kudu::client::sp::shared_ptr;
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(w.table_name(), &table));
+ KuduScanner scanner(table.get());
+ ASSERT_OK(scanner.SetReadMode(mode));
+ ASSERT_OK(scanner.SetSelection(sel));
+ // Setup batch size to be small enough to guarantee the scan
+ // will not fetch all the data at once.
+ ASSERT_OK(scanner.SetBatchSizeBytes(1));
+ ASSERT_OK(scanner.Open());
+ ASSERT_TRUE(scanner.HasMoreRows());
+ KuduScanBatch batch;
+ ASSERT_OK(scanner.NextBatch(&batch));
+ size_t row_count = batch.NumRows();
+
+ // Once the first batch of data has been fetched and there is some more
+ // to fetch, delete the table.
+ NO_FATALS(DeleteTable(w.table_name(), ON_ERROR_DO_NOT_DUMP_STACKS));
+
+ // Wait while the table is no longer advertised on the cluster.
+ // This ensures the table deletion request has been processed by tablet
+ // servers.
+ vector<string> tablets;
+ do {
+ SleepFor(MonoDelta::FromMilliseconds(250));
+ tablets = inspect_->ListTablets();
+ } while (!tablets.empty());
+
+ // Make sure the scanner can continue and fetch the rest of rows.
+ ASSERT_TRUE(scanner.HasMoreRows());
+ while (scanner.HasMoreRows()) {
+ KuduScanBatch batch;
+ const Status s = scanner.NextBatch(&batch);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ row_count += batch.NumRows();
+ }
+
+ // Verify the total row count. The exact count must be there in case of
+ // READ_AT_SNAPSHOT mode regardless of replica selection or if reading
+ // from a leader tablet in any scan mode. In the case of the READ_LATEST
+ // mode the data might be fetched from a lagging replica and the scan
+ // row count might be less than the inserted row count.
+ if (mode == KuduScanner::READ_AT_SNAPSHOT ||
+ sel == KuduClient::LEADER_ONLY) {
+ EXPECT_EQ(ref_row_count, row_count);
+ }
+
+ // Close the scanner to make sure it does not hold any references on the
+ // data about to be deleted by the hosting tablet server.
+ scanner.Close();
+
+ // Make sure the table has been deleted.
+ EXPECT_OK(inspect_->WaitForNoData());
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(StopCluster());
+}
+
+const KuduScanner::ReadMode read_modes[] = {
+ KuduScanner::READ_LATEST,
+ KuduScanner::READ_AT_SNAPSHOT,
+};
+const KuduClient::ReplicaSelection replica_selectors[] = {
+ KuduClient::LEADER_ONLY,
+ KuduClient::CLOSEST_REPLICA,
+ KuduClient::FIRST_REPLICA,
+};
+INSTANTIATE_TEST_CASE_P(
+ Params, DeleteTableWhileScanInProgressParamTest,
+ ::testing::Combine(::testing::ValuesIn(read_modes),
+ ::testing::ValuesIn(replica_selectors)));
+
+} // namespace kudu
[2/3] kudu git commit: itests: Rename delete_table-test to
delete_table-itest
Posted by mp...@apache.org.
http://git-wip-us.apache.org/repos/asf/kudu/blob/889d1524/src/kudu/integration-tests/delete_table-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-test.cc b/src/kudu/integration-tests/delete_table-test.cc
deleted file mode 100644
index 8f8af2d..0000000
--- a/src/kudu/integration-tests/delete_table-test.cc
+++ /dev/null
@@ -1,1472 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <memory>
-#include <string>
-#include <tuple>
-#include <unordered_map>
-
-#include <boost/optional.hpp>
-#include <glog/stl_logging.h>
-#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-
-#include "kudu/client/client-test-util.h"
-#include "kudu/client/shared_ptr.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/split.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/integration-tests/cluster_verifier.h"
-#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
-#include "kudu/integration-tests/test_workload.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_admin.pb.h"
-#include "kudu/util/curl_util.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/pstack_watcher.h"
-#include "kudu/util/subprocess.h"
-
-using kudu::client::KuduClient;
-using kudu::client::KuduScanner;
-using kudu::client::KuduScanBatch;
-using kudu::client::KuduSchema;
-using kudu::client::KuduSchemaFromSchema;
-using kudu::client::KuduTable;
-using kudu::client::KuduTableCreator;
-using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
-using kudu::consensus::ConsensusMetadataPB;
-using kudu::consensus::ConsensusStatePB;
-using kudu::consensus::RaftPeerPB;
-using kudu::itest::TServerDetails;
-using kudu::tablet::TABLET_DATA_COPYING;
-using kudu::tablet::TABLET_DATA_DELETED;
-using kudu::tablet::TABLET_DATA_READY;
-using kudu::tablet::TABLET_DATA_TOMBSTONED;
-using kudu::tablet::TabletDataState;
-using kudu::tablet::TabletSuperBlockPB;
-using kudu::tserver::ListTabletsResponsePB;
-using kudu::tserver::TabletServerErrorPB;
-using std::numeric_limits;
-using std::string;
-using std::unique_ptr;
-using std::unordered_map;
-using std::vector;
-using strings::Substitute;
-
-METRIC_DECLARE_entity(server);
-METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_TSHeartbeat);
-METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet);
-
-namespace kudu {
-
-class DeleteTableTest : public ExternalMiniClusterITestBase {
- protected:
- enum IsCMetaExpected {
- CMETA_NOT_EXPECTED = 0,
- CMETA_EXPECTED = 1
- };
-
- enum IsSuperBlockExpected {
- SUPERBLOCK_NOT_EXPECTED = 0,
- SUPERBLOCK_EXPECTED = 1
- };
-
- enum ErrorDumpStackSelector {
- ON_ERROR_DO_NOT_DUMP_STACKS = 0,
- ON_ERROR_DUMP_STACKS = 1,
- };
-
- // Get the UUID of the leader of the specified tablet, as seen by the TS with
- // the given 'ts_uuid'.
- string GetLeaderUUID(const string& ts_uuid, const string& tablet_id);
-
- Status CheckTabletTombstonedOrDeletedOnTS(
- int index,
- const string& tablet_id,
- TabletDataState data_state,
- IsCMetaExpected is_cmeta_expected,
- IsSuperBlockExpected is_superblock_expected);
-
- Status CheckTabletTombstonedOnTS(int index,
- const string& tablet_id,
- IsCMetaExpected is_cmeta_expected);
-
- Status CheckTabletDeletedOnTS(int index,
- const string& tablet_id,
- IsSuperBlockExpected is_superblock_expected);
-
- void WaitForTabletTombstonedOnTS(int index,
- const string& tablet_id,
- IsCMetaExpected is_cmeta_expected);
-
- void WaitForTabletDeletedOnTS(int index,
- const string& tablet_id,
- IsSuperBlockExpected is_superblock_expected);
-
- void WaitForTSToCrash(int index);
- void WaitForAllTSToCrash();
- void WaitUntilTabletRunning(int index, const std::string& tablet_id);
-
- // Delete the given table. If the operation times out, optionally dump
- // the master stacks to help debug master-side deadlocks.
- void DeleteTable(const string& table_name,
- ErrorDumpStackSelector selector = ON_ERROR_DUMP_STACKS);
-};
-
-string DeleteTableTest::GetLeaderUUID(const string& ts_uuid, const string& tablet_id) {
- ConsensusStatePB cstate;
- CHECK_OK(itest::GetConsensusState(ts_map_[ts_uuid], tablet_id, CONSENSUS_CONFIG_COMMITTED,
- MonoDelta::FromSeconds(10), &cstate));
- return cstate.leader_uuid();
-}
-
-Status DeleteTableTest::CheckTabletTombstonedOrDeletedOnTS(
- int index,
- const string& tablet_id,
- TabletDataState data_state,
- IsCMetaExpected is_cmeta_expected,
- IsSuperBlockExpected is_superblock_expected) {
- CHECK(data_state == TABLET_DATA_TOMBSTONED || data_state == TABLET_DATA_DELETED) << data_state;
- // There should be no WALs and no cmeta.
- if (inspect_->CountFilesInWALDirForTS(index, tablet_id) > 0) {
- return Status::IllegalState("WAL segments exist for tablet", tablet_id);
- }
- if (is_cmeta_expected == CMETA_EXPECTED &&
- !inspect_->DoesConsensusMetaExistForTabletOnTS(index, tablet_id)) {
- return Status::IllegalState("Expected cmeta for tablet " + tablet_id + " but it doesn't exist");
- }
- if (is_superblock_expected == SUPERBLOCK_EXPECTED) {
- RETURN_NOT_OK(inspect_->CheckTabletDataStateOnTS(index, tablet_id, { data_state }));
- } else {
- TabletSuperBlockPB superblock_pb;
- Status s = inspect_->ReadTabletSuperBlockOnTS(index, tablet_id, &superblock_pb);
- if (!s.IsNotFound()) {
- return Status::IllegalState("Found unexpected superblock for tablet " + tablet_id);
- }
- }
- return Status::OK();
-}
-
-Status DeleteTableTest::CheckTabletTombstonedOnTS(int index,
- const string& tablet_id,
- IsCMetaExpected is_cmeta_expected) {
- return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_TOMBSTONED,
- is_cmeta_expected, SUPERBLOCK_EXPECTED);
-}
-
-Status DeleteTableTest::CheckTabletDeletedOnTS(int index,
- const string& tablet_id,
- IsSuperBlockExpected is_superblock_expected) {
- return CheckTabletTombstonedOrDeletedOnTS(index, tablet_id, TABLET_DATA_DELETED,
- CMETA_NOT_EXPECTED, is_superblock_expected);
-}
-
-void DeleteTableTest::WaitForTabletTombstonedOnTS(int index,
- const string& tablet_id,
- IsCMetaExpected is_cmeta_expected) {
- Status s;
- for (int i = 0; i < 6000; i++) {
- s = CheckTabletTombstonedOnTS(index, tablet_id, is_cmeta_expected);
- if (s.ok()) return;
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- ASSERT_OK(s);
-}
-
-void DeleteTableTest::WaitForTabletDeletedOnTS(int index,
- const string& tablet_id,
- IsSuperBlockExpected is_superblock_expected) {
- Status s;
- for (int i = 0; i < 6000; i++) {
- s = CheckTabletDeletedOnTS(index, tablet_id, is_superblock_expected);
- if (s.ok()) return;
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- ASSERT_OK(s);
-}
-
-void DeleteTableTest::WaitForTSToCrash(int index) {
- auto ts = cluster_->tablet_server(index);
- SCOPED_TRACE(ts->instance_id().permanent_uuid());
- ASSERT_OK(ts->WaitForInjectedCrash(MonoDelta::FromSeconds(60)));
-}
-
-void DeleteTableTest::WaitForAllTSToCrash() {
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- NO_FATALS(WaitForTSToCrash(i));
- }
-}
-
-void DeleteTableTest::WaitUntilTabletRunning(int index, const std::string& tablet_id) {
- ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(index)->uuid()],
- tablet_id, MonoDelta::FromSeconds(60)));
-}
-
-void DeleteTableTest::DeleteTable(const string& table_name,
- ErrorDumpStackSelector selector) {
- Status s = client_->DeleteTable(table_name);
- if (s.IsTimedOut() && (ON_ERROR_DUMP_STACKS == selector)) {
- WARN_NOT_OK(PstackWatcher::DumpPidStacks(cluster_->master()->pid()),
- "Couldn't dump stacks");
- }
- ASSERT_OK(s);
-}
-
-// Test deleting an empty table, and ensure that the tablets get removed,
-// and the master no longer shows the table as existing.
-TEST_F(DeleteTableTest, TestDeleteEmptyTable) {
- NO_FATALS(StartCluster());
- // Create a table on the cluster. We're just using TestWorkload
- // as a convenient way to create it.
- TestWorkload(cluster_.get()).Setup();
-
- // The table should have replicas on all three tservers.
- ASSERT_OK(inspect_->WaitForReplicaCount(3));
-
- // Grab the tablet ID (used later).
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- const string& tablet_id = tablets[0];
-
- // Delete it and wait for the replicas to get deleted.
- // We should have no tablets at the filesystem layer after deleting the table.
- NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName));
- ASSERT_OK(inspect_->WaitForNoData());
-
- // Check that the master no longer exposes the table in any way:
-
- // 1) Should not list it in ListTables.
- vector<string> table_names;
- ASSERT_OK(client_->ListTables(&table_names));
- ASSERT_TRUE(table_names.empty()) << "table still exposed in ListTables";
-
- // 2) Should respond to GetTableSchema with a NotFound error.
- KuduSchema schema;
- Status s = client_->GetTableSchema(TestWorkload::kDefaultTableName, &schema);
- ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-
- // 3) Should return an error for GetTabletLocations RPCs.
- {
- rpc::RpcController rpc;
- master::GetTabletLocationsRequestPB req;
- master::GetTabletLocationsResponsePB resp;
- rpc.set_timeout(MonoDelta::FromSeconds(10));
- req.add_tablet_ids()->assign(tablet_id);
- ASSERT_OK(cluster_->master_proxy()->GetTabletLocations(req, &resp, &rpc));
- SCOPED_TRACE(SecureDebugString(resp));
- ASSERT_EQ(1, resp.errors_size());
- ASSERT_STR_CONTAINS(SecureShortDebugString(resp.errors(0)),
- "code: NOT_FOUND message: \"Tablet deleted: Table deleted");
- }
-
- // 4) The master 'dump-entities' page should not list the deleted table or tablets.
- EasyCurl c;
- faststring entities_buf;
- ASSERT_OK(c.FetchURL(Substitute("http://$0/dump-entities",
- cluster_->master()->bound_http_hostport().ToString()),
- &entities_buf));
- rapidjson::Document doc;
- doc.Parse<0>(entities_buf.ToString().c_str());
- ASSERT_EQ(0, doc["tables"].Size());
- ASSERT_EQ(0, doc["tablets"].Size());
-}
-
-// Test that a DeleteTablet RPC is rejected without a matching destination UUID.
-TEST_F(DeleteTableTest, TestDeleteTabletDestUuidValidation) {
- NO_FATALS(StartCluster());
- // Create a table on the cluster. We're just using TestWorkload
- // as a convenient way to create it.
- TestWorkload(cluster_.get()).Setup();
- ASSERT_OK(inspect_->WaitForReplicaCount(3));
-
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- const string& tablet_id = tablets[0];
-
- TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
-
- tserver::DeleteTabletRequestPB req;
- tserver::DeleteTabletResponsePB resp;
- rpc::RpcController rpc;
- rpc.set_timeout(MonoDelta::FromSeconds(20));
-
- req.set_dest_uuid("fake-uuid");
- req.set_tablet_id(tablet_id);
- req.set_delete_type(TABLET_DATA_TOMBSTONED);
- ASSERT_OK(ts->tserver_admin_proxy->DeleteTablet(req, &resp, &rpc));
- ASSERT_TRUE(resp.has_error());
- ASSERT_EQ(tserver::TabletServerErrorPB::WRONG_SERVER_UUID, resp.error().code())
- << SecureShortDebugString(resp);
- ASSERT_STR_CONTAINS(StatusFromPB(resp.error().status()).ToString(),
- "Wrong destination UUID");
-}
-
-// Test the atomic CAS argument to DeleteTablet().
-TEST_F(DeleteTableTest, TestAtomicDeleteTablet) {
- MonoDelta timeout = MonoDelta::FromSeconds(30);
- NO_FATALS(StartCluster());
- // Create a table on the cluster. We're just using TestWorkload
- // as a convenient way to create it.
- TestWorkload(cluster_.get()).Setup();
-
- // The table should have replicas on all three tservers.
- ASSERT_OK(inspect_->WaitForReplicaCount(3));
-
- // Grab the tablet ID (used later).
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- const string& tablet_id = tablets[0];
-
- const int kTsIndex = 0;
- TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-
- // The committed config starts off with an opid_index of -1, so choose something lower.
- boost::optional<int64_t> opid_index(-2);
- tserver::TabletServerErrorPB::Code error_code;
- ASSERT_OK(itest::WaitUntilTabletRunning(ts, tablet_id, timeout));
-
- Status s;
- for (int i = 0; i < 100; i++) {
- s = itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, opid_index, timeout,
- &error_code);
- if (error_code == TabletServerErrorPB::CAS_FAILED) break;
- // If we didn't get the expected CAS_FAILED error, it's OK to get 'TABLET_NOT_RUNNING'
- // because the "creating" maintenance state persists just slightly after it starts to
- // expose 'RUNNING' state in ListTablets()
- ASSERT_EQ(TabletServerErrorPB::TABLET_NOT_RUNNING, error_code)
- << "unexpected error: " << s.ToString();
- SleepFor(MonoDelta::FromMilliseconds(100));
- }
-
- ASSERT_EQ(TabletServerErrorPB::CAS_FAILED, error_code) << "unexpected error: " << s.ToString();
- ASSERT_STR_CONTAINS(s.ToString(), "of -2 but the committed config has opid_index of -1");
-
- // Now use the "latest", which is -1.
- opid_index = -1;
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, opid_index, timeout,
- &error_code));
- inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED });
-
- // Now that the tablet is already tombstoned, our opid_index should be
- // ignored (because it's impossible to check it).
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, -9999, timeout,
- &error_code));
- inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_TOMBSTONED });
-
- // Same with TOMBSTONED -> DELETED.
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_DELETED, -9999, timeout,
- &error_code));
- inspect_->CheckTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_DELETED });
-}
-
-TEST_F(DeleteTableTest, TestDeleteTableWithConcurrentWrites) {
- NO_FATALS(StartCluster());
- int n_iters = AllowSlowTests() ? 20 : 1;
- for (int i = 0; i < n_iters; i++) {
- LOG(INFO) << "Running iteration " << i;
- TestWorkload workload(cluster_.get());
- workload.set_table_name(Substitute("table-$0", i));
-
- // We'll delete the table underneath the writers, so we expcted
- // a NotFound error during the writes.
- workload.set_not_found_allowed(true);
- workload.Setup();
-
- // Start the workload, and wait to see some rows actually inserted
- workload.Start();
- while (workload.rows_inserted() < 100) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- // Delete it and wait for the replicas to get deleted.
- NO_FATALS(DeleteTable(workload.table_name()));
- ASSERT_OK(inspect_->WaitForNoData());
-
- // Sleep just a little longer to make sure client threads send
- // requests to the missing tablets.
- SleepFor(MonoDelta::FromMilliseconds(50));
-
- workload.StopAndJoin();
- NO_FATALS(cluster_->AssertNoCrashes());
- }
-}
-
-// Test that a tablet replica is automatically tombstoned on startup if a local
-// crash occurs in the middle of Tablet Copy. Additionally acts as a regression
-// test for KUDU-1605.
-TEST_F(DeleteTableTest, TestAutoTombstoneAfterCrashDuringTabletCopy) {
- // Set up flags to flush frequently, so that we get data on disk.
- vector<string> ts_flags = {
- "--flush_threshold_mb=0",
- "--maintenance_manager_polling_interval_ms=100"
- };
- vector<string> master_flags = {
- "--allow_unsafe_replication_factor=true"
- };
- NO_FATALS(StartCluster(ts_flags, master_flags));
- const MonoDelta timeout = MonoDelta::FromSeconds(10);
- const int kTsIndex = 0; // We'll test with the first TS.
-
- // Shut down TS 1, 2, write some data to TS 0 alone.
- cluster_->tablet_server(1)->Shutdown();
- cluster_->tablet_server(2)->Shutdown();
- cluster_->master()->Shutdown();
- ASSERT_OK(cluster_->master()->Restart());
- ASSERT_OK(cluster_->WaitForTabletServerCount(1, MonoDelta::FromSeconds(30)));
-
- // Set up a table which has a tablet only on TS 0. This will be used to test for
- // "collateral damage" bugs where incorrect handling of the main test tablet
- // accidentally removes blocks from another tablet.
- // We use a sequential workload so that we just flush and don't compact.
- // Thus we use a contiguous set of block IDs starting with 1, with no "holes"
- // from deleted blocks.
- string unaffected_tablet_id;
- TestWorkload unrepl_workload(cluster_.get());
- {
- unrepl_workload.set_num_replicas(1);
- unrepl_workload.set_table_name("other-table");
- unrepl_workload.set_num_write_threads(1);
- unrepl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
- unrepl_workload.Setup();
-
- // Figure out the tablet ID.
- vector<string> tablets = inspect_->ListTabletsOnTS(kTsIndex);
- ASSERT_EQ(1, tablets.size());
- unaffected_tablet_id = tablets[0];
-
- // Write rows until it has flushed a few times.
- unrepl_workload.Start();
- TabletSuperBlockPB sb;
- while (sb.rowsets().size() < 3) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kTsIndex, unaffected_tablet_id, &sb));
- }
- unrepl_workload.StopAndJoin();
- }
-
- // Restart the other two servers.
- ASSERT_OK(cluster_->tablet_server(1)->Restart());
- ASSERT_OK(cluster_->tablet_server(2)->Restart());
- cluster_->tablet_server(kTsIndex)->Shutdown();
-
- // Restart the master to be sure that it only sees the live servers.
- // Otherwise it may try to create a tablet with a replica on the down server.
- // The table creation would eventually succeed after picking a different set of
- // replicas, but not before causing a timeout.
- cluster_->master()->Shutdown();
- ASSERT_OK(cluster_->master()->Restart());
- ASSERT_OK(cluster_->WaitForTabletServerCount(2, MonoDelta::FromSeconds(30)));
-
- // Create a new table with a single tablet replicated on the other two servers.
- // We use the same sequential workload. This produces block ID sequences
- // that look like:
- // TS 0: |---- blocks from 'other-table' ---]
- // TS 1: |---- blocks from default workload table ---]
- // TS 2: |---- blocks from default workload table ---]
- // So, if we were to accidentally reuse or delete block IDs from TS 1 or TS 2 when
- // copying a tablet to TS 0, we'd surely notice it!
- string replicated_tablet_id;
-
- TestWorkload repl_workload(cluster_.get());
- {
- repl_workload.set_num_replicas(2);
- repl_workload.set_num_write_threads(1);
- repl_workload.set_write_pattern(TestWorkload::INSERT_SEQUENTIAL_ROWS);
- repl_workload.Setup();
- ASSERT_OK(inspect_->WaitForReplicaCount(3)); // 1 original plus 2 new replicas.
-
- // Figure out the tablet id of the new table.
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- replicated_tablet_id = tablets[0];
-
- repl_workload.Start();
- TabletSuperBlockPB sb;
- while (sb.rowsets().size() < 3) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(1, replicated_tablet_id, &sb));
- }
- repl_workload.StopAndJoin();
- }
-
- // Enable a fault crash when Tablet Copy occurs on TS 0.
- cluster_->tablet_server(kTsIndex)->mutable_flags()->push_back(
- "--fault_crash_after_tc_files_fetched=1.0");
-
- // Restart TS 0 and add it to the config. It will crash when tablet copy starts.
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- string leader_uuid = GetLeaderUUID(cluster_->tablet_server(1)->uuid(), replicated_tablet_id);
- TServerDetails* leader = DCHECK_NOTNULL(ts_map_[leader_uuid]);
- TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
- ASSERT_OK(itest::AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER,
- boost::none, timeout));
- NO_FATALS(WaitForTSToCrash(kTsIndex));
- cluster_->tablet_server(kTsIndex)->Shutdown();
-
- LOG(INFO) << "Test progress: crashed on first attempt to copy";
-
- // The superblock should be in TABLET_DATA_COPYING state on disk.
- NO_FATALS(inspect_->CheckTabletDataStateOnTS(kTsIndex, replicated_tablet_id,
- { TABLET_DATA_COPYING }));
-
- // Restart and let it crash one more time.
- cluster_->tablet_server(kTsIndex)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- NO_FATALS(WaitForTSToCrash(kTsIndex));
- LOG(INFO) << "Test progress: crashed on second attempt to copy";
-
- // Remove the fault flag and let it successfully copy the tablet.
- cluster_->tablet_server(kTsIndex)->mutable_flags()->pop_back();
- cluster_->tablet_server(kTsIndex)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
-
- // Everything should be consistent now.
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST,
- repl_workload.rows_inserted()));
- NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST,
- unrepl_workload.rows_inserted()));
-
- // Now we want to test the case where crashed while copying over a previously-tombstoned tablet.
- // So, we first remove the server, causing it to get tombstoned.
- ASSERT_OK(itest::RemoveServer(leader, replicated_tablet_id, ts, boost::none, timeout));
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, replicated_tablet_id, CMETA_EXPECTED));
-
- // ... and then add it back, with the fault runtime-enabled.
- ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex),
- "fault_crash_after_tc_files_fetched", "1.0"));
- ASSERT_OK(itest::AddServer(leader, replicated_tablet_id, ts, RaftPeerPB::VOTER,
- boost::none, timeout));
- NO_FATALS(WaitForTSToCrash(kTsIndex));
- LOG(INFO) << "Test progress: crashed on attempt to copy over tombstoned";
-
- // Finally, crashing with no fault flags should fully recover again.
- cluster_->tablet_server(kTsIndex)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(repl_workload.table_name(), ClusterVerifier::AT_LEAST,
- repl_workload.rows_inserted()));
- NO_FATALS(v.CheckRowCount(unrepl_workload.table_name(), ClusterVerifier::AT_LEAST,
- unrepl_workload.rows_inserted()));
-}
-
-// Test that a tablet replica automatically tombstones itself if the remote
-// server fails in the middle of the Tablet Copy process.
-// Also test that we can Copy Tablet over a tombstoned tablet.
-TEST_F(DeleteTableTest, TestAutoTombstoneAfterTabletCopyRemoteFails) {
- vector<string> ts_flags = {
- "--enable_leader_failure_detection=false", // Make test deterministic.
- "--log_segment_size_mb=1", // Faster log rolls.
- "--log_compression_codec=NO_COMPRESSION" // Faster log rolls.
- };
- vector<string> master_flags = {
- "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
- "--allow_unsafe_replication_factor=true"
- };
- NO_FATALS(StartCluster(ts_flags, master_flags));
- const MonoDelta kTimeout = MonoDelta::FromSeconds(20);
- const int kTsIndex = 0; // We'll test with the first TS.
-
- // We'll do a config change to Tablet Copy a replica here later. For
- // now, shut down TS-0.
- LOG(INFO) << "Shutting down TS " << cluster_->tablet_server(kTsIndex)->uuid();
- cluster_->tablet_server(kTsIndex)->Shutdown();
-
- // Bounce the Master so it gets new tablet reports and doesn't try to assign
- // a replica to the dead TS.
- cluster_->master()->Shutdown();
- ASSERT_OK(cluster_->master()->Restart());
- cluster_->WaitForTabletServerCount(2, kTimeout);
-
- // Start a workload on the cluster, and run it for a little while.
- TestWorkload workload(cluster_.get());
- workload.set_num_replicas(2);
- workload.Setup();
- ASSERT_OK(inspect_->WaitForReplicaCount(2));
-
- // Figure out the tablet id.
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- const string& tablet_id = tablets[0];
-
- for (int i = 1; i <= 2; i++) {
- NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
- }
-
- // Elect a leader and run some data through the cluster.
- const int kLeaderIndex = 1;
- string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
- ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout));
- workload.Start();
- while (workload.rows_inserted() < 100) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- // Tablet Copy doesn't see the active WAL segment, and we need to
- // download a file to trigger the fault in this test. Due to the log index
- // chunks, that means 3 files minimum: One in-flight WAL segment, one index
- // chunk file (these files grow much more slowly than the WAL segments), and
- // one completed WAL segment.
- ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kLeaderIndex, tablet_id, 3));
- workload.StopAndJoin();
-
- // Cause the leader to crash when a follower tries to initiate Tablet Copy from it.
- const string& fault_flag = "fault_crash_on_handle_tc_fetch_data";
- ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kLeaderIndex), fault_flag, "1.0"));
-
- // Add TS-0 as a new member to the config and wait for the leader to crash.
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- TServerDetails* leader = ts_map_[kLeaderUuid];
- TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
- // The server may crash before responding to our RPC.
- Status s = itest::AddServer(leader, tablet_id, ts, RaftPeerPB::VOTER, boost::none, kTimeout);
- ASSERT_TRUE(s.ok() || s.IsNetworkError()) << s.ToString();
- NO_FATALS(WaitForTSToCrash(kLeaderIndex));
-
- // The tablet server will detect that the leader failed, and automatically
- // tombstone its replica. Shut down the other non-leader replica to avoid
- // interference while we wait for this to happen.
- cluster_->tablet_server(1)->Shutdown();
- cluster_->tablet_server(2)->Shutdown();
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED));
-
- // Check the textual status message for the failed copy.
- {
- vector<ListTabletsResponsePB::StatusAndSchemaPB> status_pbs;
- ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, kTimeout, &status_pbs));
- ASSERT_STR_MATCHES(status_pbs[0].tablet_status().last_status(),
- "Tablet Copy: Tombstoned tablet .*: Tablet copy aborted");
- }
-
- // Now bring the other replicas back, re-elect the previous leader (TS-1),
- // and wait for the leader to Tablet Copy the tombstoned replica. This
- // will have replaced a tablet with no consensus metadata.
- ASSERT_OK(cluster_->tablet_server(1)->Restart());
- ASSERT_OK(cluster_->tablet_server(2)->Restart());
- for (int i = 1; i <= 2; i++) {
- NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
- }
- ASSERT_OK(itest::StartElection(ts_map_[kLeaderUuid], tablet_id, kTimeout));
- ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
-
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
- workload.rows_inserted()));
-
- // Now pause the other replicas and tombstone our replica again.
- ASSERT_OK(cluster_->tablet_server(1)->Pause());
- ASSERT_OK(cluster_->tablet_server(2)->Pause());
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_NOT_EXPECTED));
-
- // Bring them back again, let them yet again Copy a new replica on top of our tombstoned replica.
- // This time, the leader will have replaced a tablet with consensus metadata.
- ASSERT_OK(cluster_->tablet_server(1)->Resume());
- ASSERT_OK(cluster_->tablet_server(2)->Resume());
- ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
-
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(workload.table_name(), ClusterVerifier::AT_LEAST,
- workload.rows_inserted()));
-}
-
-// Test for correct Tablet Copy merge of consensus metadata.
-TEST_F(DeleteTableTest, TestMergeConsensusMetadata) {
- // Enable manual leader selection.
- vector<string> ts_flags, master_flags;
- ts_flags.push_back("--enable_leader_failure_detection=false");
- master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
- NO_FATALS(StartCluster(ts_flags, master_flags));
- const MonoDelta timeout = MonoDelta::FromSeconds(10);
- const int kTsIndex = 0;
-
- TestWorkload workload(cluster_.get());
- workload.Setup();
- ASSERT_OK(inspect_->WaitForReplicaCount(3));
-
- // Figure out the tablet id to Tablet Copy.
- vector<string> tablets = inspect_->ListTabletsOnTS(1);
- ASSERT_EQ(1, tablets.size());
- const string& tablet_id = tablets[0];
-
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- NO_FATALS(WaitUntilTabletRunning(i, tablet_id));
- }
-
- // Elect a leader and run some data through the cluster.
- int leader_index = 1;
- string leader_uuid = cluster_->tablet_server(leader_index)->uuid();
- ASSERT_OK(itest::StartElection(ts_map_[leader_uuid], tablet_id, timeout));
- workload.Start();
- while (workload.rows_inserted() < 100) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- workload.StopAndJoin();
- ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, workload.batches_completed()));
-
- // Verify that TS 0 voted for the chosen leader.
- ConsensusMetadataPB cmeta_pb;
- ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
- ASSERT_EQ(1, cmeta_pb.current_term());
- ASSERT_EQ(leader_uuid, cmeta_pb.voted_for());
-
- // Shut down all but TS 0 and try to elect TS 0. The election will fail but
- // the TS will record a vote for itself as well as a new term (term 2).
- cluster_->tablet_server(1)->Shutdown();
- cluster_->tablet_server(2)->Shutdown();
- NO_FATALS(WaitUntilTabletRunning(kTsIndex, tablet_id));
- TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
- ASSERT_OK(itest::StartElection(ts, tablet_id, timeout));
- for (int i = 0; i < 6000; i++) {
- Status s = inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb);
- if (s.ok() &&
- cmeta_pb.current_term() == 2 &&
- cmeta_pb.voted_for() == ts->uuid()) {
- break;
- }
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- ASSERT_EQ(2, cmeta_pb.current_term());
- ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for());
-
- // Tombstone our special little guy, then shut him down.
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
- cluster_->tablet_server(kTsIndex)->Shutdown();
-
- // Restart the other dudes and re-elect the same leader.
- ASSERT_OK(cluster_->tablet_server(1)->Restart());
- ASSERT_OK(cluster_->tablet_server(2)->Restart());
- TServerDetails* leader = ts_map_[leader_uuid];
- NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
- NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
- ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
- ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout));
-
- // Bring our special little guy back up.
- // Wait until he gets Tablet Copied.
- LOG(INFO) << "Bringing TS " << cluster_->tablet_server(kTsIndex)->uuid()
- << " back up...";
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
-
- // Assert that the election history is retained (voted for self).
- ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
- ASSERT_EQ(2, cmeta_pb.current_term());
- ASSERT_EQ(ts->uuid(), cmeta_pb.voted_for());
-
- // Now do the same thing as above, where we tombstone TS 0 then trigger a new
- // term (term 3) on the other machines. TS 0 will get copied
- // again, but this time the vote record on TS 0 for term 2 should not be
- // retained after Tablet Copy occurs.
- cluster_->tablet_server(1)->Shutdown();
- cluster_->tablet_server(2)->Shutdown();
-
- // Delete with retries because the tablet might still be bootstrapping.
- NO_FATALS(itest::DeleteTabletWithRetries(ts, tablet_id,
- TABLET_DATA_TOMBSTONED, boost::none, timeout));
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
-
- ASSERT_OK(cluster_->tablet_server(1)->Restart());
- ASSERT_OK(cluster_->tablet_server(2)->Restart());
- NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
- NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
- ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
- ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
-
- // The election history should have been wiped out.
- ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
- ASSERT_EQ(3, cmeta_pb.current_term());
- ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb);
-}
-
-// Regression test for KUDU-987, a bug where followers with transactions in
-// REPLICATING state, which means they have not yet been committed to a
-// majority, cannot shut down during a DeleteTablet() call.
-TEST_F(DeleteTableTest, TestDeleteFollowerWithReplicatingTransaction) {
- if (!AllowSlowTests()) {
- // We will typically wait at least 5 seconds for timeouts to occur.
- LOG(INFO) << "Skipping test in fast-test mode.";
- return;
- }
-
- const MonoDelta timeout = MonoDelta::FromSeconds(10);
-
- const int kNumTabletServers = 5;
- vector<string> ts_flags, master_flags;
- ts_flags.push_back("--enable_leader_failure_detection=false");
- ts_flags.push_back("--flush_threshold_mb=0"); // Always be flushing.
- ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
- master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
- NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
-
- const int kTsIndex = 0; // We'll test with the first TS.
- TServerDetails* ts = ts_map_[cluster_->tablet_server(kTsIndex)->uuid()];
-
- // Create the table.
- TestWorkload workload(cluster_.get());
- workload.set_num_replicas(kNumTabletServers);
- workload.Setup();
-
- // Figure out the tablet ids of the created tablets.
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(WaitForNumTabletsOnTS(ts, 1, timeout, &tablets));
- const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
- // Wait until all replicas are up and running.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
- tablet_id, timeout));
- }
-
- // Elect TS 1 as leader.
- const int kLeaderIndex = 1;
- const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
- TServerDetails* leader = ts_map_[kLeaderUuid];
- ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
- ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
-
- // Kill a majority, but leave the leader and a single follower.
- LOG(INFO) << "Killing majority";
- for (int i = 2; i < kNumTabletServers; i++) {
- cluster_->tablet_server(i)->Shutdown();
- }
-
- // Now write a single row to the leader.
- // We give 5 seconds for the timeout to pretty much guarantee that a flush
- // will occur due to the low flush threshold we set.
- LOG(INFO) << "Writing a row";
- Status s = WriteSimpleTestRow(leader, tablet_id, RowOperationsPB::INSERT,
- 1, 1, "hola, world", MonoDelta::FromSeconds(5));
- ASSERT_TRUE(s.IsTimedOut());
- ASSERT_STR_CONTAINS(s.ToString(), "timed out");
-
- LOG(INFO) << "Killing the leader...";
- cluster_->tablet_server(kLeaderIndex)->Shutdown();
-
- // Now tombstone the follower tablet. This should succeed even though there
- // are uncommitted operations on the replica.
- LOG(INFO) << "Tombstoning tablet " << tablet_id << " on TS " << ts->uuid();
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
-}
-
-// Test that orphaned blocks are cleared from the superblock when a tablet is
-// tombstoned.
-TEST_F(DeleteTableTest, TestOrphanedBlocksClearedOnDelete) {
- const MonoDelta timeout = MonoDelta::FromSeconds(30);
- vector<string> ts_flags, master_flags;
- ts_flags.push_back("--enable_leader_failure_detection=false");
- ts_flags.push_back("--flush_threshold_mb=0"); // Flush quickly since we wait for a flush to occur.
- ts_flags.push_back("--maintenance_manager_polling_interval_ms=100");
- master_flags.push_back("--catalog_manager_wait_for_new_tablets_to_elect_leader=false");
- NO_FATALS(StartCluster(ts_flags, master_flags));
-
- const int kFollowerIndex = 0;
- TServerDetails* follower_ts = ts_map_[cluster_->tablet_server(kFollowerIndex)->uuid()];
-
- // Create the table.
- TestWorkload workload(cluster_.get());
- workload.Setup();
-
- // Figure out the tablet id of the created tablet.
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(WaitForNumTabletsOnTS(follower_ts, 1, timeout, &tablets));
- const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
- // Wait until all replicas are up and running.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
- tablet_id, timeout));
- }
-
- // Elect TS 1 as leader.
- const int kLeaderIndex = 1;
- const string kLeaderUuid = cluster_->tablet_server(kLeaderIndex)->uuid();
- TServerDetails* leader_ts = ts_map_[kLeaderUuid];
- ASSERT_OK(itest::StartElection(leader_ts, tablet_id, timeout));
- ASSERT_OK(WaitForServersToAgree(timeout, ts_map_, tablet_id, 1));
-
- // Run a write workload and wait until we see some rowsets flush on the follower.
- workload.Start();
- TabletSuperBlockPB superblock_pb;
- for (int i = 0; i < 3000; i++) {
- ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb));
- if (!superblock_pb.rowsets().empty()) break;
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- ASSERT_GT(superblock_pb.rowsets_size(), 0)
- << "Timed out waiting for rowset flush on TS " << follower_ts->uuid() << ": "
- << "Superblock:\n" << SecureDebugString(superblock_pb);
-
- // Shut down the leader so it doesn't try to copy a new replica to our follower later.
- workload.StopAndJoin();
- cluster_->tablet_server(kLeaderIndex)->Shutdown();
-
- // Tombstone the follower and check that there are no rowsets or orphaned
- // blocks retained in the superblock.
- ASSERT_OK(itest::DeleteTablet(follower_ts, tablet_id, TABLET_DATA_TOMBSTONED,
- boost::none, timeout));
- NO_FATALS(WaitForTabletTombstonedOnTS(kFollowerIndex, tablet_id, CMETA_EXPECTED));
- ASSERT_OK(inspect_->ReadTabletSuperBlockOnTS(kFollowerIndex, tablet_id, &superblock_pb));
- ASSERT_EQ(0, superblock_pb.rowsets_size()) << SecureDebugString(superblock_pb);
- ASSERT_EQ(0, superblock_pb.orphaned_blocks_size()) << SecureDebugString(superblock_pb);
-}
-
-vector<const string*> Grep(const string& needle, const vector<string>& haystack) {
- vector<const string*> results;
- for (const string& s : haystack) {
- if (s.find(needle) != string::npos) {
- results.push_back(&s);
- }
- }
- return results;
-}
-
-vector<string> ListOpenFiles(pid_t pid) {
- string cmd = strings::Substitute("export PATH=$$PATH:/usr/bin:/usr/sbin; lsof -n -p $0", pid);
- vector<string> argv = { "bash", "-c", cmd };
- string out;
- CHECK_OK(Subprocess::Call(argv, "", &out));
- vector<string> lines = strings::Split(out, "\n");
- return lines;
-}
-
-int PrintOpenTabletFiles(pid_t pid, const string& tablet_id) {
- vector<string> lines = ListOpenFiles(pid);
- vector<const string*> wal_lines = Grep(tablet_id, lines);
- LOG(INFO) << "There are " << wal_lines.size() << " open WAL files for pid " << pid << ":";
- for (const string* l : wal_lines) {
- LOG(INFO) << *l;
- }
- return wal_lines.size();
-}
-
-// Regression test for tablet deletion FD leak. See KUDU-1288.
-TEST_F(DeleteTableTest, TestFDsNotLeakedOnTabletTombstone) {
- const MonoDelta timeout = MonoDelta::FromSeconds(30);
-
- NO_FATALS(StartCluster({}, {}, 1));
-
- // Create the table.
- TestWorkload workload(cluster_.get());
- workload.set_num_replicas(1);
- workload.Setup();
- workload.Start();
- while (workload.rows_inserted() < 1000) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- workload.StopAndJoin();
-
- // Figure out the tablet id of the created tablet.
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
- const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
- // Tombstone the tablet and then ensure that lsof does not list any
- // tablet-related paths.
- ExternalTabletServer* ets = cluster_->tablet_server(0);
- ASSERT_OK(itest::DeleteTablet(ts_map_[ets->uuid()],
- tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
- ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id));
-
- // Restart the TS after deletion and then do the same lsof check again.
- ets->Shutdown();
- ASSERT_OK(ets->Restart());
- ASSERT_EQ(0, PrintOpenTabletFiles(ets->pid(), tablet_id));
-}
-
-// Regression test for KUDU-1545: crash when visiting the tablet page for a
-// tombstoned tablet.
-TEST_F(DeleteTableTest, TestWebPageForTombstonedTablet) {
- const MonoDelta timeout = MonoDelta::FromSeconds(30);
-
- NO_FATALS(StartCluster({}, {}, 1));
-
- // Create the table.
- TestWorkload workload(cluster_.get());
- workload.set_num_replicas(1);
- workload.Setup();
-
- // Figure out the tablet id of the created tablet.
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
- const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
- // Tombstone the tablet.
- ExternalTabletServer* ets = cluster_->tablet_server(0);
- AssertEventually([&]() {
- ASSERT_OK(itest::DeleteTablet(ts_map_[ets->uuid()],
- tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
- });
-
- // Check the various web pages associated with the tablet, ensuring
- // they don't crash and at least have the tablet ID within them.
- EasyCurl c;
- const auto& pages = { "tablet",
- "tablet-rowsetlayout-svg",
- "tablet-consensus-status",
- "log-anchors" };
- for (const auto& page : pages) {
- faststring buf;
- ASSERT_OK(c.FetchURL(Substitute(
- "http://$0/$1?id=$2",
- cluster_->tablet_server(0)->bound_http_hostport().ToString(),
- page,
- tablet_id), &buf));
- ASSERT_STR_CONTAINS(buf.ToString(), tablet_id);
- }
-}
-
-
-TEST_F(DeleteTableTest, TestUnknownTabletsAreNotDeleted) {
- // Speed up heartbeating so that the unknown tablet is detected faster.
- vector<string> extra_ts_flags = { "--heartbeat_interval_ms=10" };
-
- NO_FATALS(StartCluster(extra_ts_flags, {}, 1));
-
- Schema schema(GetSimpleTestSchema());
- client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema));
- unique_ptr<KuduTableCreator> creator(client_->NewTableCreator());
- ASSERT_OK(creator->table_name("test")
- .schema(&client_schema)
- .set_range_partition_columns({"key"})
- .num_replicas(1)
- .Create());
-
- // Figure out the tablet id of the created tablet.
- const MonoDelta timeout = MonoDelta::FromSeconds(30);
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(WaitForNumTabletsOnTS(ts_map_.begin()->second, 1, timeout, &tablets));
- const string& tablet_id = tablets[0].tablet_status().tablet_id();
-
- // Delete the master's metadata and start it back up. The tablet created
- // above is now unknown, but should not be deleted!
- cluster_->master()->Shutdown();
- ASSERT_OK(env_->DeleteRecursively(cluster_->master()->data_dir()));
- ASSERT_OK(cluster_->master()->Restart());
-
- // Give the master a chance to finish writing the new master tablet to disk
- // so that it can be found after the subsequent restart below.
- ASSERT_OK(cluster_->master()->WaitForCatalogManager());
-
- int64_t num_delete_attempts;
- AssertEventually([&]() {
- int64_t num_heartbeats;
- ASSERT_OK(cluster_->master()->GetInt64Metric(
- &METRIC_ENTITY_server, "kudu.master",
- &METRIC_handler_latency_kudu_master_MasterService_TSHeartbeat, "total_count",
- &num_heartbeats));
- ASSERT_GE(num_heartbeats, 1);
- });
- ASSERT_OK(cluster_->tablet_server(0)->GetInt64Metric(
- &METRIC_ENTITY_server, "kudu.tabletserver",
- &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
- "total_count", &num_delete_attempts));
- ASSERT_EQ(0, num_delete_attempts);
-
- // Now restart the master with orphan deletion enabled. The tablet should get
- // deleted.
- // We also need to restart the tablet server, or else the old tablet server
- // won't be able to authenticate to the new master, due to it having a new
- // CA certificate which the old tserver doesn't trust.
- //
- // TODO(KUDU-65): perhaps this is actually a feature? should we have tablet servers
- // remember the CA cert persistently so that it's impossible to connect an
- // old tserver to a new cluster?
- cluster_->Shutdown();
- cluster_->master()->mutable_flags()->push_back(
- "--catalog_manager_delete_orphaned_tablets");
- ASSERT_OK(cluster_->Restart());
- AssertEventually([&]() {
- ASSERT_OK(cluster_->tablet_server(0)->GetInt64Metric(
- &METRIC_ENTITY_server, "kudu.tabletserver",
- &METRIC_handler_latency_kudu_tserver_TabletServerAdminService_DeleteTablet,
- "total_count", &num_delete_attempts));
- // Sometimes the tablet server has time to report the orphaned tablet multiple times
- // before the delete succeeds. This is ok because tablet deletion is idempotent.
- ASSERT_GE(num_delete_attempts, 1);
- ASSERT_OK(CheckTabletDeletedOnTS(0, tablet_id, SUPERBLOCK_NOT_EXPECTED));
- });
-
-}
-
-// Parameterized test case for TABLET_DATA_DELETED deletions.
-class DeleteTableDeletedParamTest : public DeleteTableTest,
- public ::testing::WithParamInterface<const char*> {
-};
-
-// Test that if a server crashes mid-delete that the delete will be rolled
-// forward on startup. Parameterized by different fault flags that cause a
-// crash at various points.
-TEST_P(DeleteTableDeletedParamTest, TestRollForwardDelete) {
- NO_FATALS(StartCluster());
- const string fault_flag = GetParam();
- LOG(INFO) << "Running with fault flag: " << fault_flag;
-
- // Dynamically set the fault flag so they crash when DeleteTablet() is called
- // by the Master.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(i), fault_flag, "1.0"));
- }
-
- // Create a table on the cluster. We're just using TestWorkload
- // as a convenient way to create it.
- TestWorkload(cluster_.get()).Setup();
-
- // The table should have replicas on all three tservers.
- ASSERT_OK(inspect_->WaitForReplicaCount(3));
-
- // Delete it and wait for the tablet servers to crash.
- NO_FATALS(DeleteTable(TestWorkload::kDefaultTableName));
- NO_FATALS(WaitForAllTSToCrash());
-
- // There should still be data left on disk.
- Status s = inspect_->CheckNoData();
- ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
-
- // Now restart the tablet servers. They should roll forward their deletes.
- // We don't have to reset the fault flag here because it was set dynamically.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- cluster_->tablet_server(i)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(i)->Restart());
- }
- ASSERT_OK(inspect_->WaitForNoData());
-}
-
-// Faults appropriate for the TABLET_DATA_DELETED case.
-const char* deleted_faults[] = {"fault_crash_after_blocks_deleted",
- "fault_crash_after_wal_deleted",
- "fault_crash_after_cmeta_deleted"};
-
-INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableDeletedParamTest,
- ::testing::ValuesIn(deleted_faults));
-
-// Parameterized test case for TABLET_DATA_TOMBSTONED deletions.
-class DeleteTableTombstonedParamTest : public DeleteTableTest,
- public ::testing::WithParamInterface<const char*> {
-};
-
-// Regression test for tablet tombstoning. Tests:
-// 1. basic creation & tombstoning of a tablet.
-// 2. roll-forward (crash recovery) of a partially-completed tombstoning of a tablet.
-// 3. permanent deletion of a TOMBSTONED tablet
-// (transition from TABLET_DATA_TOMBSTONED to TABLET_DATA_DELETED).
-TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
- vector<string> flags;
- // We want fast log rolls and deterministic preallocation, since we wait for
- // a certain number of logs at the beginning of the test.
- flags.push_back("--log_segment_size_mb=1");
- flags.push_back("--log_async_preallocate_segments=false");
- flags.push_back("--log_min_segments_to_retain=3");
- flags.push_back("--log_compression_codec=NO_COMPRESSION");
- NO_FATALS(StartCluster(flags));
- const string fault_flag = GetParam();
- LOG(INFO) << "Running with fault flag: " << fault_flag;
-
- MonoDelta timeout = MonoDelta::FromSeconds(30);
-
- // Create a table with 2 tablets. We delete the first tablet without
- // injecting any faults, then we delete the second tablet while exercising
- // several fault injection points.
- const int kNumTablets = 2;
- vector<const KuduPartialRow*> split_rows;
- Schema schema(GetSimpleTestSchema());
- client::KuduSchema client_schema(client::KuduSchemaFromSchema(schema));
- KuduPartialRow* split_row = client_schema.NewRow();
- ASSERT_OK(split_row->SetInt32(0, numeric_limits<int32_t>::max() / kNumTablets));
- split_rows.push_back(split_row);
- gscoped_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
- ASSERT_OK(table_creator->table_name(TestWorkload::kDefaultTableName)
- .split_rows(split_rows)
- .schema(&client_schema)
- .set_range_partition_columns({ "key" })
- .num_replicas(3)
- .Create());
-
- // Start a workload on the cluster, and run it until we find WALs on disk.
- TestWorkload workload(cluster_.get());
- workload.set_payload_bytes(32 * 1024); // Write ops of size 32KB to quickly fill the logs.
- workload.set_write_batch_size(1);
- workload.Setup();
-
- // The table should have 2 tablets (1 split) on all 3 tservers (for a total of 6).
- ASSERT_OK(inspect_->WaitForReplicaCount(6));
-
- // Set up the proxies so we can easily send DeleteTablet() RPCs.
- TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
-
- // Ensure the tablet server is reporting 2 tablets.
- vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
- ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
-
- LOG(INFO) << "Starting workload...";
-
- // Run the workload against whoever the leader is until WALs appear on TS 0
- // for the tablets we created.
- const int kTsIndex = 0; // Index of the tablet server we'll use for the test.
- workload.Start();
- while (workload.rows_inserted() < 100) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- LOG(INFO) << "Waiting for 3 wal files for tablet "
- << tablets[0].tablet_status().tablet_id() << "...";
- ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex,
- tablets[0].tablet_status().tablet_id(), 3));
-
- LOG(INFO) << "Waiting for 3 wal files for tablet "
- << tablets[1].tablet_status().tablet_id() << "...";
- ASSERT_OK(inspect_->WaitForMinFilesInTabletWalDirOnTS(kTsIndex,
- tablets[1].tablet_status().tablet_id(), 3));
-
- LOG(INFO) << "Stopping workload...";
- workload.StopAndJoin();
-
- // Shut down the master and the other tablet servers so they don't interfere
- // by attempting to create or copy tablets while we delete tablets.
- cluster_->master()->Shutdown();
- cluster_->tablet_server(1)->Shutdown();
- cluster_->tablet_server(2)->Shutdown();
-
- // Tombstone the first tablet.
- string tablet_id = tablets[0].tablet_status().tablet_id();
- LOG(INFO) << "Tombstoning first tablet " << tablet_id << "...";
- ASSERT_TRUE(inspect_->DoesConsensusMetaExistForTabletOnTS(kTsIndex, tablet_id)) << tablet_id;
- ASSERT_OK(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
- LOG(INFO) << "Waiting for first tablet to be tombstoned...";
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
-
- ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
- for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
- if (t.tablet_status().tablet_id() == tablet_id) {
- ASSERT_EQ(tablet::SHUTDOWN, t.tablet_status().state());
- ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
- << t.tablet_status().tablet_id() << " not tombstoned";
- }
- }
-
- // Now tombstone the 2nd tablet, causing a fault.
- ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server(kTsIndex), fault_flag, "1.0"));
- tablet_id = tablets[1].tablet_status().tablet_id();
- LOG(INFO) << "Tombstoning second tablet " << tablet_id << "...";
- ignore_result(itest::DeleteTablet(ts, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, timeout));
- NO_FATALS(WaitForTSToCrash(kTsIndex));
-
- // Restart the tablet server and wait for the WALs to be deleted and for the
- // superblock to show that it is tombstoned.
- cluster_->tablet_server(kTsIndex)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- LOG(INFO) << "Waiting for second tablet to be tombstoned...";
- NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
- // The tombstoned tablets will still show up in ListTablets(),
- // just with their data state set as TOMBSTONED. They should also be listed
- // as NOT_STARTED because we restarted the server.
- ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
- for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
- ASSERT_EQ(tablet::NOT_STARTED, t.tablet_status().state());
- ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
- << t.tablet_status().tablet_id() << " not tombstoned";
- }
-
- // Check that, upon restart of the tablet server with a tombstoned tablet,
- // we don't unnecessary "roll forward" and rewrite the tablet metadata file
- // when it is already fully deleted.
- int64_t orig_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id);
- cluster_->tablet_server(kTsIndex)->Shutdown();
- ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
- int64_t new_mtime = inspect_->GetTabletSuperBlockMTimeOrDie(kTsIndex, tablet_id);
- ASSERT_EQ(orig_mtime, new_mtime)
- << "Tablet superblock should not have been re-flushed unnecessarily";
-
- // Finally, delete all tablets on the TS, and wait for all data to be gone.
- LOG(INFO) << "Deleting all tablets...";
- for (const ListTabletsResponsePB::StatusAndSchemaPB& tablet : tablets) {
- string tablet_id = tablet.tablet_status().tablet_id();
- // We need retries here, since some of the tablets may still be
- // bootstrapping after being restarted above.
- NO_FATALS(itest::DeleteTabletWithRetries(ts, tablet_id,
- TABLET_DATA_DELETED, boost::none, timeout));
- }
- ASSERT_OK(inspect_->WaitForNoDataOnTS(kTsIndex));
-}
-
-// Faults appropriate for the TABLET_DATA_TOMBSTONED case.
-// Tombstoning a tablet does not delete the consensus metadata.
-const char* tombstoned_faults[] = {"fault_crash_after_blocks_deleted",
- "fault_crash_after_wal_deleted"};
-INSTANTIATE_TEST_CASE_P(FaultFlags, DeleteTableTombstonedParamTest,
- ::testing::ValuesIn(tombstoned_faults));
-
-
-class DeleteTableWhileScanInProgressParamTest :
- public DeleteTableTest,
- public ::testing::WithParamInterface<
- std::tuple<KuduScanner::ReadMode, KuduClient::ReplicaSelection>> {
-};
-
-// Make sure the tablet server keeps the necessary data to serve scan request in
-// progress if tablet is marked for deletion.
-TEST_P(DeleteTableWhileScanInProgressParamTest, Test) {
- const auto read_mode_to_string = [](KuduScanner::ReadMode mode) {
- switch (mode) {
- case KuduScanner::READ_LATEST:
- return "READ_LATEST";
- case KuduScanner::READ_AT_SNAPSHOT:
- return "READ_AT_SNAPSHOT";
- default:
- return "UNKNOWN";
- }
- };
- const auto replica_sel_to_string = [](KuduClient::ReplicaSelection sel) {
- switch (sel) {
- case KuduClient::LEADER_ONLY:
- return "LEADER_ONLY";
- case KuduClient::CLOSEST_REPLICA:
- return "CLOSEST_REPLICA";
- case KuduClient::FIRST_REPLICA:
- return "FIRST_REPLICA";
- default:
- return "UNKNOWN";
- }
- };
-
- const std::vector<std::string> extra_ts_flags = {
- // Set the flush threshold low so that we have a mix of flushed and
- // unflushed operations in the WAL, when we bootstrap.
- "--flush_threshold_mb=1",
-
- // Set the compaction budget to be low so that we get multiple passes
- // of compaction instead of selecting all of the rowsets in a single
- // compaction of the whole tablet.
- "--tablet_compaction_budget_mb=1",
-
- // Set the major delta compaction ratio low enough that we trigger
- // a lot of them.
- "--tablet_delta_store_major_compact_min_ratio=0.001",
- };
-
- // Approximate number of rows to insert. This is not exact number due to the
- // way the test controls the progress of the test workload.
-#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
- // Test is too slow in ASAN/TSAN.
- const size_t rows_to_insert = 10000;
-#else
- const size_t rows_to_insert = AllowSlowTests() ? 100000 : 10000;
-#endif
-
- const auto& param = GetParam();
- const auto mode = std::get<0>(param);
- const auto sel = std::get<1>(param);
-
- // In case of failure, print out string representation of the parameterized
- // case for ease of troubleshooting.
- SCOPED_TRACE(Substitute("read mode $0; replica selection mode $1",
- read_mode_to_string(mode),
- replica_sel_to_string(sel)));
- NO_FATALS(StartCluster(extra_ts_flags));
-
- TestWorkload w(cluster_.get());
- w.set_write_pattern(TestWorkload::INSERT_RANDOM_ROWS);
- w.Setup();
-
- // Start the workload, and wait to see some rows actually inserted.
- w.Start();
- while (w.rows_inserted() < rows_to_insert) {
- SleepFor(MonoDelta::FromMilliseconds(50));
- }
- w.StopAndJoin();
- const int64_t ref_row_count = w.rows_inserted();
-
- using kudu::client::sp::shared_ptr;
- shared_ptr<KuduTable> table;
- ASSERT_OK(client_->OpenTable(w.table_name(), &table));
- KuduScanner scanner(table.get());
- ASSERT_OK(scanner.SetReadMode(mode));
- ASSERT_OK(scanner.SetSelection(sel));
- // Setup batch size to be small enough to guarantee the scan
- // will not fetch all the data at once.
- ASSERT_OK(scanner.SetBatchSizeBytes(1));
- ASSERT_OK(scanner.Open());
- ASSERT_TRUE(scanner.HasMoreRows());
- KuduScanBatch batch;
- ASSERT_OK(scanner.NextBatch(&batch));
- size_t row_count = batch.NumRows();
-
- // Once the first batch of data has been fetched and there is some more
- // to fetch, delete the table.
- NO_FATALS(DeleteTable(w.table_name(), ON_ERROR_DO_NOT_DUMP_STACKS));
-
- // Wait while the table is no longer advertised on the cluster.
- // This ensures the table deletion request has been processed by tablet
- // servers.
- vector<string> tablets;
- do {
- SleepFor(MonoDelta::FromMilliseconds(250));
- tablets = inspect_->ListTablets();
- } while (!tablets.empty());
-
- // Make sure the scanner can continue and fetch the rest of rows.
- ASSERT_TRUE(scanner.HasMoreRows());
- while (scanner.HasMoreRows()) {
- KuduScanBatch batch;
- const Status s = scanner.NextBatch(&batch);
- ASSERT_TRUE(s.ok()) << s.ToString();
- row_count += batch.NumRows();
- }
-
- // Verify the total row count. The exact count must be there in case of
- // READ_AT_SNAPSHOT mode regardless of replica selection or if reading
- // from a leader tablet in any scan mode. In the case of the READ_LATEST
- // mode the data might be fetched from a lagging replica and the scan
- // row count might be less than the inserted row count.
- if (mode == KuduScanner::READ_AT_SNAPSHOT ||
- sel == KuduClient::LEADER_ONLY) {
- EXPECT_EQ(ref_row_count, row_count);
- }
-
- // Close the scanner to make sure it does not hold any references on the
- // data about to be deleted by the hosting tablet server.
- scanner.Close();
-
- // Make sure the table has been deleted.
- EXPECT_OK(inspect_->WaitForNoData());
- NO_FATALS(cluster_->AssertNoCrashes());
- NO_FATALS(StopCluster());
-}
-
-const KuduScanner::ReadMode read_modes[] = {
- KuduScanner::READ_LATEST,
- KuduScanner::READ_AT_SNAPSHOT,
-};
-const KuduClient::ReplicaSelection replica_selectors[] = {
- KuduClient::LEADER_ONLY,
- KuduClient::CLOSEST_REPLICA,
- KuduClient::FIRST_REPLICA,
-};
-INSTANTIATE_TEST_CASE_P(
- Params, DeleteTableWhileScanInProgressParamTest,
- ::testing::Combine(::testing::ValuesIn(read_modes),
- ::testing::ValuesIn(replica_selectors)));
-
-} // namespace kudu