You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/01/22 22:25:53 UTC
[kudu] 01/02: file cache: support alternate open modes
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 21073b06ce30171b10ed09bee133fcafe650f022
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Mon Jan 20 21:18:08 2020 -0800
file cache: support alternate open modes
Previously, the file cache only supported opening existing files; any new
file creation happened out-of-band and then the file reopened via the cache.
If we're going to use the file cache for log index chunks, however, we need
to support CREATE_OR_OPEN style usage, and doing it in the log index itself
is somewhat hairy.
This patch modifies the file cache to support most of the modes defined in
Env::OpenMode. I tried to ensure that cache operations look and feel like a
standard POSIX filesystem, but it's tough to get this right, and I'm sure I
missed some corner cases. I feel pretty good about our use cases (block
managers, log segments, and log index chunks) though.
Change-Id: Ie167302ef85b8e1a40fbb89a7742e2cbb43bcec3
Reviewed-on: http://gerrit.cloudera.org:8080/15081
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/fs/file_block_manager.cc | 3 +-
src/kudu/fs/log_block_manager.cc | 73 ++++++++--------
src/kudu/util/file_cache-stress-test.cc | 4 +-
src/kudu/util/file_cache-test.cc | 90 ++++++++++++++++----
src/kudu/util/file_cache.cc | 142 ++++++++++++++++++++++++--------
src/kudu/util/file_cache.h | 49 ++++++++---
6 files changed, 259 insertions(+), 102 deletions(-)
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 0c5796c..09b2a99 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -829,7 +829,8 @@ Status FileBlockManager::OpenBlock(const BlockId& block_id,
shared_ptr<RandomAccessFile> reader;
if (PREDICT_TRUE(file_cache_)) {
- RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenExistingFile(path, &reader));
+ RETURN_NOT_OK_FBM_DISK_FAILURE(file_cache_->OpenFile<Env::MUST_EXIST>(
+ path, &reader));
} else {
unique_ptr<RandomAccessFile> r;
RETURN_NOT_OK_FBM_DISK_FAILURE(env_->NewRandomAccessFile(path, &r));
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index b09f319..cb40778 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -764,42 +764,47 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
//
// When looping, we delete any created-and-orphaned files.
do {
- unique_ptr<RWFile> rwf;
-
- if (metadata_writer) {
- block_manager->env()->DeleteFile(metadata_path);
- }
common_path = JoinPathSegments(dir->dir(),
block_manager->oid_generator()->Next());
metadata_path = StrCat(common_path, LogBlockManager::kContainerMetadataFileSuffix);
- RWFileOptions wr_opts;
- wr_opts.mode = Env::MUST_CREATE;
- metadata_status = block_manager->env()->NewRWFile(wr_opts,
- metadata_path,
- &rwf);
- metadata_writer.reset(rwf.release());
-
- if (data_file) {
- block_manager->env()->DeleteFile(data_path);
- }
data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
- RWFileOptions rw_opts;
- rw_opts.mode = Env::MUST_CREATE;
- data_status = block_manager->env()->NewRWFile(rw_opts,
- data_path,
- &rwf);
- data_file.reset(rwf.release());
+
+ if (PREDICT_TRUE(block_manager->file_cache_)) {
+ if (metadata_writer) {
+ WARN_NOT_OK(block_manager->file_cache_->DeleteFile(metadata_path),
+ "could not delete orphaned metadata file thru file cache");
+ }
+ if (data_file) {
+ WARN_NOT_OK(block_manager->file_cache_->DeleteFile(data_path),
+ "could not delete orphaned data file thru file cache");
+ }
+ metadata_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
+ metadata_path, &metadata_writer);
+ data_status = block_manager->file_cache_->OpenFile<Env::MUST_CREATE>(
+ data_path, &data_file);
+ } else {
+ if (metadata_writer) {
+ WARN_NOT_OK(block_manager->env()->DeleteFile(metadata_path),
+ "could not delete orphaned metadata file");
+ }
+ if (data_file) {
+ WARN_NOT_OK(block_manager->env()->DeleteFile(data_path),
+ "could not delete orphaned data file");
+ }
+ unique_ptr<RWFile> rwf;
+ RWFileOptions rw_opts;
+
+ rw_opts.mode = Env::MUST_CREATE;
+ metadata_status = block_manager->env()->NewRWFile(
+ rw_opts, metadata_path, &rwf);
+ metadata_writer.reset(rwf.release());
+ data_status = block_manager->env()->NewRWFile(
+ rw_opts, data_path, &rwf);
+ data_file.reset(rwf.release());
+ }
} while (PREDICT_FALSE(metadata_status.IsAlreadyPresent() ||
data_status.IsAlreadyPresent()));
if (metadata_status.ok() && data_status.ok()) {
- if (PREDICT_TRUE(block_manager->file_cache_)) {
- metadata_writer.reset();
- RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
- metadata_path, &metadata_writer));
- data_file.reset();
- RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
- data_path, &data_file));
- }
unique_ptr<WritablePBContainerFile> metadata_file(new WritablePBContainerFile(
std::move(metadata_writer)));
RETURN_NOT_OK_CONTAINER_DISK_FAILURE(metadata_file->CreateNew(BlockRecordPB()));
@@ -834,10 +839,10 @@ Status LogBlockContainer::Open(LogBlockManager* block_manager,
shared_ptr<RWFile> metadata_file;
shared_ptr<RWFile> data_file;
if (PREDICT_TRUE(block_manager->file_cache_)) {
- RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
- metadata_path, &metadata_file));
- RETURN_NOT_OK_CONTAINER_DISK_FAILURE(block_manager->file_cache_->OpenExistingFile(
- data_path, &data_file));
+ RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
+ block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(metadata_path, &metadata_file));
+ RETURN_NOT_OK_CONTAINER_DISK_FAILURE(
+ block_manager->file_cache_->OpenFile<Env::MUST_EXIST>(data_path, &data_file));
} else {
RWFileOptions opts;
opts.mode = Env::MUST_EXIST;
@@ -1224,7 +1229,7 @@ Status LogBlockContainer::SyncMetadata() {
Status LogBlockContainer::ReopenMetadataWriter() {
shared_ptr<RWFile> f;
if (PREDICT_TRUE(block_manager_->file_cache_)) {
- RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenExistingFile(
+ RETURN_NOT_OK_HANDLE_ERROR(block_manager_->file_cache_->OpenFile<Env::MUST_EXIST>(
metadata_file_->filename(), &f));
} else {
unique_ptr<RWFile> f_uniq;
diff --git a/src/kudu/util/file_cache-stress-test.cc b/src/kudu/util/file_cache-stress-test.cc
index 8fa1b74..bd3fd22 100644
--- a/src/kudu/util/file_cache-stress-test.cc
+++ b/src/kudu/util/file_cache-stress-test.cc
@@ -172,13 +172,13 @@ class FileCacheStressTest : public KuduTest {
}
if (HasPrefixString(BaseName(to_open), kRWFPrefix)) {
shared_ptr<RWFile> rwf;
- TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &rwf));
+ TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &rwf));
rwfs.emplace_back(std::move(rwf));
} else {
CHECK(HasPrefixString(BaseName(to_open), kRAFPrefix));
shared_ptr<RandomAccessFile> raf;
- TEST_CHECK_OK(cache_->OpenExistingFile(to_open, &raf));
+ TEST_CHECK_OK(cache_->OpenFile<Env::MUST_EXIST>(to_open, &raf));
rafs.emplace_back(std::move(raf));
}
FinishedOpen(to_open);
diff --git a/src/kudu/util/file_cache-test.cc b/src/kudu/util/file_cache-test.cc
index 1fbfc38..01ecfd5 100644
--- a/src/kudu/util/file_cache-test.cc
+++ b/src/kudu/util/file_cache-test.cc
@@ -121,7 +121,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
// Open a non-existent file.
{
shared_ptr<TypeParam> f;
- ASSERT_TRUE(this->cache_->OpenExistingFile(
+ ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>(
"/does/not/exist", &f).IsNotFound());
NO_FATALS(this->AssertFdsAndDescriptors(0, 0));
}
@@ -139,7 +139,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
{
// Open a test file. It should open an fd and create a descriptor.
shared_ptr<TypeParam> f1;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f1));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f1));
NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
// Spot check the test data by comparing sizes.
@@ -153,7 +153,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
// Open the same file a second time. It should reuse the existing
// descriptor and not open a second fd.
shared_ptr<TypeParam> f2;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2));
NO_FATALS(this->AssertFdsAndDescriptors(1, 1));
{
auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE));
@@ -163,7 +163,7 @@ TYPED_TEST(FileCacheTest, TestBasicOperations) {
// Open a second file. This will create a new descriptor, but evict the fd
// opened for the first file, so the fd count should remain constant.
shared_ptr<TypeParam> f3;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f3));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f3));
NO_FATALS(this->AssertFdsAndDescriptors(1, 2));
{
auto uh(this->cache_->cache_->Lookup(kFile1, Cache::EXPECT_IN_CACHE));
@@ -207,12 +207,12 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
ASSERT_TRUE(this->env_->FileExists(kFile2));
{
shared_ptr<TypeParam> f1;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile2, &f1));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f1));
ASSERT_EQ(this->initial_open_fds_ + 1, this->CountOpenFds());
ASSERT_OK(this->cache_->DeleteFile(kFile2));
{
shared_ptr<TypeParam> f2;
- ASSERT_TRUE(this->cache_->OpenExistingFile(kFile2, &f2).IsNotFound());
+ ASSERT_TRUE(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile2, &f2).IsNotFound());
}
ASSERT_TRUE(this->cache_->DeleteFile(kFile2).IsNotFound());
ASSERT_TRUE(this->env_->FileExists(kFile2));
@@ -229,7 +229,7 @@ TYPED_TEST(FileCacheTest, TestDeletion) {
ASSERT_OK(this->WriteTestFile(kFile3, kData3));
{
shared_ptr<TypeParam> f3;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile3, &f3));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile3, &f3));
}
ASSERT_TRUE(this->env_->FileExists(kFile3));
ASSERT_EQ(this->initial_open_fds_, this->CountOpenFds());
@@ -245,7 +245,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) {
// Open the file.
shared_ptr<TypeParam> f;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f));
// Write a new file and rename it in place on top of file1.
const string kFile2 = this->GetTestPath("foo2");
@@ -266,7 +266,7 @@ TYPED_TEST(FileCacheTest, TestInvalidation) {
// But if we re-open the path again, the new descriptor should read the
// new data.
shared_ptr<TypeParam> f2;
- ASSERT_OK(this->cache_->OpenExistingFile(kFile1, &f2));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile1, &f2));
ASSERT_OK(f2->Size(&size));
ASSERT_EQ(kData2.size(), size);
}
@@ -291,7 +291,7 @@ TYPED_TEST(FileCacheTest, TestHeavyReads) {
string filename = this->GetTestPath(Substitute("$0", i));
ASSERT_OK(this->WriteTestFile(filename, data));
shared_ptr<TypeParam> f;
- ASSERT_OK(this->cache_->OpenExistingFile(filename, &f));
+ ASSERT_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(filename, &f));
opened_files.push_back(f);
}
@@ -328,7 +328,7 @@ TYPED_TEST(FileCacheTest, TestNoRecursiveDeadlock) {
threads.emplace_back([&]() {
for (int i = 0; i < 10000; i++) {
shared_ptr<TypeParam> f;
- CHECK_OK(this->cache_->OpenExistingFile(kFile, &f));
+ CHECK_OK(this->cache_->template OpenFile<Env::MUST_EXIST>(kFile, &f));
}
});
}
@@ -346,13 +346,67 @@ TEST_F(RandomAccessFileCacheTest, TestMemoryFootprintDoesNotCrash) {
ASSERT_OK(this->WriteTestFile(kFile, "test data"));
shared_ptr<RandomAccessFile> f;
- ASSERT_OK(cache_->OpenExistingFile(kFile, &f));
+ ASSERT_OK(cache_->OpenFile<Env::MUST_EXIST>(kFile, &f));
// This used to crash due to a kudu_malloc_usable_size() call on a memory
// address that wasn't the start of an actual heap allocation.
LOG(INFO) << f->memory_footprint();
}
+class RWFileCacheTest : public FileCacheTest<RWFile> {
+};
+
+TEST_F(RWFileCacheTest, TestOpenMustCreate) {
+ const string kFile1 = this->GetTestPath("foo");
+ const string kFile2 = this->GetTestPath("bar");
+
+ {
+ shared_ptr<RWFile> rwf1;
+ ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf1));
+ NO_FATALS(AssertFdsAndDescriptors(1, 1));
+
+ // If there's already a descriptor, a second open will fail in the file cache.
+ shared_ptr<RWFile> rwf2;
+ ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf2).IsAlreadyPresent());
+
+ // Now let's evict kFile1.
+ shared_ptr<RWFile> rwf3;
+ ASSERT_OK(cache_->OpenFile<Env::MUST_CREATE>(kFile2, &rwf3));
+ NO_FATALS(AssertFdsAndDescriptors(1, 2));
+
+ // The reopen of kFile1 shouldn't be with MUST_CREATE; otherwise this would fail.
+ ASSERT_OK(rwf1->Sync());
+ }
+ {
+ // Without any existing descriptors, open will fail in the filesystem.
+ NO_FATALS(AssertFdsAndDescriptors(0, 0));
+ shared_ptr<RWFile> rwf;
+ ASSERT_TRUE(cache_->OpenFile<Env::MUST_CREATE>(kFile1, &rwf).IsAlreadyPresent());
+ }
+}
+
+TEST_F(RWFileCacheTest, TestOpenCreateOrOpen) {
+ const string kFile1 = this->GetTestPath("foo");
+ const string kFile2 = this->GetTestPath("bar");
+
+ shared_ptr<RWFile> rwf1;
+ ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf1));
+
+ // If there's already a descriptor, a second open will also succeed.
+ shared_ptr<RWFile> rwf2;
+ ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile1, &rwf2));
+
+ // Now let's evict kFile1.
+ shared_ptr<RWFile> rwf3;
+ ASSERT_OK(cache_->OpenFile<Env::CREATE_OR_OPEN>(kFile2, &rwf3));
+ NO_FATALS(AssertFdsAndDescriptors(1, 2));
+
+ // The reopen of kFile1 should use MUST_EXIST. If we delete the file out
+ // from under the cache, we can see this in action as the reopen fails.
+ ASSERT_OK(env_->DeleteFile(kFile1));
+ ASSERT_TRUE(rwf1->Sync().IsNotFound());
+}
+
class MixedFileCacheTest : public KuduTest {
};
@@ -376,9 +430,9 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) {
// Open the test files, each as a different file type.
shared_ptr<RWFile> rwf;
- ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf));
+ ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf));
shared_ptr<RandomAccessFile> raf;
- ASSERT_OK(cache.OpenExistingFile(kFile2, &raf));
+ ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf));
// Verify the correct file contents for each test file.
uint64_t size;
@@ -397,13 +451,13 @@ TEST_F(MixedFileCacheTest, TestBothFileTypes) {
//
// These checks are expensive so they're only done in DEBUG mode.
shared_ptr<RWFile> rwf2;
- ASSERT_OK(cache.OpenExistingFile(kFile1, &rwf2));
+ ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile1, &rwf2));
shared_ptr<RandomAccessFile> raf2;
- ASSERT_OK(cache.OpenExistingFile(kFile2, &raf2));
+ ASSERT_OK(cache.OpenFile<Env::MUST_EXIST>(kFile2, &raf2));
#ifndef NDEBUG
- ASSERT_DEATH({ cache.OpenExistingFile(kFile1, &raf); },
+ ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile1, &raf); },
"!FindDescriptorUnlocked");
- ASSERT_DEATH({ cache.OpenExistingFile(kFile2, &rwf); },
+ ASSERT_DEATH({ cache.OpenFile<Env::MUST_EXIST>(kFile2, &rwf); },
"!FindDescriptorUnlocked");
#endif
}
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index 8fb0adf..0bbbfe7 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -240,55 +240,55 @@ class Descriptor<RWFile> : public RWFile {
Status Read(uint64_t offset, Slice result) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Read(offset, result);
}
Status ReadV(uint64_t offset, ArrayView<Slice> results) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->ReadV(offset, results);
}
Status Write(uint64_t offset, const Slice& data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Write(offset, data);
}
Status WriteV(uint64_t offset, ArrayView<const Slice> data) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->WriteV(offset, data);
}
Status PreAllocate(uint64_t offset, size_t length, PreAllocateMode mode) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->PreAllocate(offset, length, mode);
}
Status Truncate(uint64_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Truncate(length);
}
Status PunchHole(uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->PunchHole(offset, length);
}
Status Flush(FlushMode mode, uint64_t offset, size_t length) override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Flush(mode, offset, length);
}
Status Sync() override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Sync();
}
@@ -299,13 +299,13 @@ class Descriptor<RWFile> : public RWFile {
Status Size(uint64_t* size) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->Size(size);
}
Status GetExtentMap(ExtentMap* out) const override {
ScopedOpenedDescriptor<RWFile> opened(&base_);
- RETURN_NOT_OK(ReopenFileIfNecessary(&opened));
+ RETURN_NOT_OK(ReopenFileIfNecessary<Env::MUST_EXIST>(&opened));
return opened.file()->GetExtentMap(out);
}
@@ -316,14 +316,17 @@ class Descriptor<RWFile> : public RWFile {
private:
friend class ::kudu::FileCache;
+ template <Env::OpenMode Mode>
Status Init() {
- return once_.Init(&Descriptor<RWFile>::InitOnce, this);
+ return once_.Init(&Descriptor<RWFile>::InitOnce<Mode>, this);
}
+ template <Env::OpenMode Mode>
Status InitOnce() {
- return ReopenFileIfNecessary(nullptr);
+ return ReopenFileIfNecessary<Mode>(nullptr);
}
+ template <Env::OpenMode Mode>
Status ReopenFileIfNecessary(ScopedOpenedDescriptor<RWFile>* out) const {
ScopedOpenedDescriptor<RWFile> found(base_.LookupFromCache());
CHECK(!base_.invalidated());
@@ -337,7 +340,7 @@ class Descriptor<RWFile> : public RWFile {
// The file was evicted, reopen it.
RWFileOptions opts;
- opts.mode = Env::MUST_EXIST;
+ opts.mode = Mode;
unique_ptr<RWFile> f;
RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
@@ -488,50 +491,112 @@ Status FileCache::Init() {
}
template <>
-Status FileCache::OpenExistingFile(const string& file_name,
- shared_ptr<RWFile>* file) {
+Status FileCache::DoOpenFile(const string& file_name,
+ shared_ptr<internal::Descriptor<RWFile>>* file,
+ bool* created_desc) {
shared_ptr<internal::Descriptor<RWFile>> d;
+ bool cd;
{
std::lock_guard<simple_spinlock> l(lock_);
- d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &rwf_descs_);
+ d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+ &rwf_descs_, &cd);
DCHECK(d);
+#ifndef NDEBUG
// Enforce the invariant that a particular file name may only be used by one
// descriptor at a time. This is expensive so it's only done in DEBUG mode.
- DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_));
+ bool ignored;
+ CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+ &raf_descs_, &ignored));
+#endif
}
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
-
- // Check that the underlying file can be opened (no-op for found descriptors).
- //
- // Done outside the lock.
- RETURN_NOT_OK(d->Init());
*file = std::move(d);
+ *created_desc = cd;
return Status::OK();
}
template <>
-Status FileCache::OpenExistingFile(const string& file_name,
- shared_ptr<RandomAccessFile>* file) {
+Status FileCache::DoOpenFile(const string& file_name,
+ shared_ptr<internal::Descriptor<RandomAccessFile>>* file,
+ bool* created_desc) {
shared_ptr<internal::Descriptor<RandomAccessFile>> d;
+ bool cd;
{
std::lock_guard<simple_spinlock> l(lock_);
- d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST, &raf_descs_);
+ d = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
+ &raf_descs_, &cd);
DCHECK(d);
+#ifndef NDEBUG
// Enforce the invariant that a particular file name may only be used by one
// descriptor at a time. This is expensive so it's only done in DEBUG mode.
- DCHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_));
+ bool ignored;
+ CHECK(!FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+ &rwf_descs_, &ignored));
+#endif
}
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
}
+ *file = std::move(d);
+ *created_desc = cd;
+ return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::CREATE_OR_OPEN>(const string& file_name,
+ shared_ptr<RWFile>* file) {
+ shared_ptr<internal::Descriptor<RWFile>> d;
+ bool ignored;
+ RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
+
+ // Check that the underlying file can be opened (no-op for found descriptors).
+ RETURN_NOT_OK(d->Init<Env::CREATE_OR_OPEN>());
+ *file = std::move(d);
+ return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_CREATE>(const string& file_name,
+ shared_ptr<RWFile>* file) {
+ shared_ptr<internal::Descriptor<RWFile>> d;
+ bool created_desc;
+ RETURN_NOT_OK(DoOpenFile(file_name, &d, &created_desc));
+
+ if (!created_desc) {
+ return Status::AlreadyPresent("file already exists", file_name);
+ }
+
+ // Check that the underlying file can be opened (no-op for found descriptors).
+ RETURN_NOT_OK(d->Init<Env::MUST_CREATE>());
+ *file = std::move(d);
+ return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
+ shared_ptr<RWFile>* file) {
+ shared_ptr<internal::Descriptor<RWFile>> d;
+ bool ignored;
+ RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
+
+ // Check that the underlying file can be opened (no-op for found descriptors).
+ RETURN_NOT_OK(d->Init<Env::MUST_EXIST>());
+ *file = std::move(d);
+ return Status::OK();
+}
+
+template <>
+Status FileCache::OpenFile<Env::MUST_EXIST>(const string& file_name,
+ shared_ptr<RandomAccessFile>* file) {
+ shared_ptr<internal::Descriptor<RandomAccessFile>> d;
+ bool ignored;
+ RETURN_NOT_OK(DoOpenFile(file_name, &d, &ignored));
// Check that the underlying file can be opened (no-op for found descriptors).
- //
- // Done outside the lock.
RETURN_NOT_OK(d->Init());
*file = std::move(d);
return Status::OK();
@@ -543,8 +608,10 @@ Status FileCache::DeleteFile(const string& file_name) {
// descriptor in the first map.
{
std::lock_guard<simple_spinlock> l(lock_);
+ bool ignored;
{
- auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &rwf_descs_);
+ auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+ &rwf_descs_, &ignored);
if (d) {
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
@@ -554,7 +621,8 @@ Status FileCache::DeleteFile(const string& file_name) {
}
}
{
- auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE, &raf_descs_);
+ auto d = FindDescriptorUnlocked(file_name, FindMode::DONT_CREATE,
+ &raf_descs_, &ignored);
if (d) {
if (d->base_.deleted()) {
return Status::NotFound(kAlreadyDeleted, file_name);
@@ -585,13 +653,14 @@ void FileCache::Invalidate(const string& file_name) {
shared_ptr<internal::Descriptor<RandomAccessFile>> raf_desc;
{
std::lock_guard<simple_spinlock> l(lock_);
+ bool ignored;
rwf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
- &rwf_descs_);
+ &rwf_descs_, &ignored);
DCHECK(rwf_desc);
rwf_desc->base_.MarkInvalidated();
raf_desc = FindDescriptorUnlocked(file_name, FindMode::CREATE_IF_NOT_EXIST,
- &raf_descs_);
+ &raf_descs_, &ignored);
DCHECK(raf_desc);
raf_desc->base_.MarkInvalidated();
}
@@ -669,7 +738,8 @@ template <class FileType>
shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
const string& file_name,
FindMode mode,
- DescriptorMap<FileType>* descs) {
+ DescriptorMap<FileType>* descs,
+ bool* created_desc) {
DCHECK(lock_.is_locked());
shared_ptr<internal::Descriptor<FileType>> d;
@@ -682,6 +752,7 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
// Descriptor is still valid, return it.
VLOG(2) << "Found existing descriptor: " << file_name;
+ *created_desc = false;
return d;
}
// Descriptor has expired; erase it and pretend we found nothing.
@@ -692,6 +763,9 @@ shared_ptr<internal::Descriptor<FileType>> FileCache::FindDescriptorUnlocked(
d = std::make_shared<internal::Descriptor<FileType>>(this, file_name);
EmplaceOrDie(descs, file_name, d);
VLOG(2) << "Created new descriptor: " << file_name;
+ *created_desc = true;
+ } else {
+ *created_desc = false;
}
return d;
}
diff --git a/src/kudu/util/file_cache.h b/src/kudu/util/file_cache.h
index ed3663c..c22ee91 100644
--- a/src/kudu/util/file_cache.h
+++ b/src/kudu/util/file_cache.h
@@ -28,15 +28,12 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/cache.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/env.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"
namespace kudu {
-class Env;
-class RWFile;
-class RandomAccessFile;
-
namespace internal {
template <class FileType>
@@ -63,7 +60,7 @@ class Thread;
// The core of the client-facing API is the cache descriptor. A descriptor
// uniquely identifies an opened file. To a client, a descriptor is just an
// open file interface of the variety defined in util/env.h. Clients open
-// descriptors via the OpenExistingFile() cache methods.
+// descriptors via the OpenFile() cache methods.
//
// Descriptors are shared objects; an existing descriptor is handed back to a
// client if a file with the same name is already opened. To facilitate
@@ -118,12 +115,27 @@ class FileCache {
// to a file-like interface but interfaces with the cache under the hood to
// reopen a file as needed during file operations.
//
- // The underlying file is opened immediately to verify that it indeed exists,
- // but may be closed later if the cache reaches its upper bound on the number
- // of open files. It is also closed when the descriptor's last reference is dropped.
- template <class FileType>
- Status OpenExistingFile(const std::string& file_name,
- std::shared_ptr<FileType>* file);
+ // The underlying file is opened immediately to respect 'Mode', but may be
+ // closed later if the cache reaches its upper bound on the number of open
+ // files. It is also closed when the descriptor's last reference is dropped.
+ //
+ // All file types honor a 'Mode' of MUST_EXIST. Some may honor other modes as
+ // well, although transparently reopening evicted files will always use
+ // MUST_EXIST. Different combinations of modes and file types are expressed as
+ // template specializations; if a file type doesn't support a particular mode,
+ // there will be a linker error.
+ //
+ // TODO(adar): The file cache tries to behave as if users were accessing the
+ // underlying POSIX filesystem directly, but its semantics aren't 100% correct
+ // when using modes other than MUST_EXIST. For example, the behavior of
+ // MUST_CREATE and CREATE_OR_OPEN isn't quite right for open files marked for
+ // deletion. In theory we should "unmark" such a file to indicate that it was
+ // recreated, and truncate it so it's empty for the second client, but the
+ // truncation would corrupt the file for the first client. In short, take
+ // great care when using any mode apart from MUST_EXIST.
+ template <Env::OpenMode Mode, class FileType>
+ Status OpenFile(const std::string& file_name,
+ std::shared_ptr<FileType>* file);
// Deletes a file by name through the cache.
//
@@ -183,7 +195,10 @@ class FileCache {
template <class FileType>
static void ExpireDescriptorsFromMap(DescriptorMap<FileType>* descs);
- // Looks up a descriptor by file name.
+ // Looks up a descriptor by file name or creates a new one (if requested).
+ //
+ // The value of 'created_desc' will be set in accordance with whether a new
+ // descriptor was created.
//
// Must be called with 'lock_' held.
enum class FindMode {
@@ -197,11 +212,19 @@ class FileCache {
std::shared_ptr<internal::Descriptor<FileType>> FindDescriptorUnlocked(
const std::string& file_name,
FindMode mode,
- DescriptorMap<FileType>* descs);
+ DescriptorMap<FileType>* descs,
+ bool* created_desc);
// Periodically removes expired descriptors from the descriptor maps.
void RunDescriptorExpiry();
+ // Actually opens the file as per OpenFile. Used to encapsulate the bulk of
+ // OpenFile because C++ prohibits partial specialization of template functions.
+ template <class FileType>
+ Status DoOpenFile(const std::string& file_name,
+ std::shared_ptr<FileType>* file,
+ bool* created_desc);
+
// Status message prefix for files that have already been marked as deleted.
static const char* const kAlreadyDeleted;