You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/04 19:23:37 UTC

[kudu] branch master updated (4061b80 -> 5cc9114)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 4061b80  Update to LLVM 9.0.0
     new ae53824  env: add CREATE_OR_OPEN and rename existing open modes
     new 5cc9114  log_index: use RWFiles for IO

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/consensus/log-test.cc                 |   2 +-
 src/kudu/consensus/log.cc                      |   2 +-
 src/kudu/consensus/log_index-test.cc           |   2 +-
 src/kudu/consensus/log_index.cc                | 129 ++++++++++---------------
 src/kudu/consensus/log_index.h                 |  33 ++++---
 src/kudu/fs/file_block_manager.cc              |   2 +-
 src/kudu/fs/fs_manager-test.cc                 |   2 +-
 src/kudu/fs/log_block_manager-test-util.cc     |  12 +--
 src/kudu/fs/log_block_manager-test.cc          |   6 +-
 src/kudu/fs/log_block_manager.cc               |   6 +-
 src/kudu/integration-tests/dense_node-itest.cc |   2 +-
 src/kudu/tools/tool_action_local_replica.cc    |   2 +-
 src/kudu/util/env-test.cc                      |  23 ++++-
 src/kudu/util/env.h                            |  28 +++---
 src/kudu/util/env_posix.cc                     |  13 ++-
 src/kudu/util/file_cache.cc                    |   2 +-
 src/kudu/util/pb_util-test.cc                  |   4 +-
 src/kudu/util/rolling_log.cc                   |   2 +-
 18 files changed, 134 insertions(+), 138 deletions(-)


[kudu] 01/02: env: add CREATE_OR_OPEN and rename existing open modes

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ae53824dc2c270fdcece6aeadc908dbfa437531e
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Dec 3 15:55:50 2019 -0800

    env: add CREATE_OR_OPEN and rename existing open modes
    
    An upcoming patch will use CREATE_OR_OPEN in the LogIndex.
    
    Change-Id: Ie5fbd4f57070a2f9f2dd0546b1246e9e5d9aaec1
    Reviewed-on: http://gerrit.cloudera.org:8080/14821
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/file_block_manager.cc              |  2 +-
 src/kudu/fs/fs_manager-test.cc                 |  2 +-
 src/kudu/fs/log_block_manager-test-util.cc     | 12 +++++------
 src/kudu/fs/log_block_manager-test.cc          |  6 +++---
 src/kudu/fs/log_block_manager.cc               |  6 +++---
 src/kudu/integration-tests/dense_node-itest.cc |  2 +-
 src/kudu/tools/tool_action_local_replica.cc    |  2 +-
 src/kudu/util/env-test.cc                      | 23 +++++++++++++++++----
 src/kudu/util/env.h                            | 28 ++++++++++++++------------
 src/kudu/util/env_posix.cc                     | 13 +++++++-----
 src/kudu/util/file_cache.cc                    |  2 +-
 src/kudu/util/pb_util-test.cc                  |  4 ++--
 src/kudu/util/rolling_log.cc                   |  2 +-
 13 files changed, 62 insertions(+), 42 deletions(-)

diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index 6686606..fcf1352 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -787,7 +787,7 @@ Status FileBlockManager::CreateBlock(const CreateBlockOptions& opts,
     RETURN_NOT_OK_HANDLE_DISK_FAILURE(s,
         error_manager_->RunErrorNotificationCb(ErrorHandlerType::DISK_ERROR, dir));
     WritableFileOptions wr_opts;
-    wr_opts.mode = Env::CREATE_NON_EXISTING;
+    wr_opts.mode = Env::MUST_CREATE;
     s = env_util::OpenFileForWrite(wr_opts, env_, path, &writer);
   } while (PREDICT_FALSE(s.IsAlreadyPresent()));
   if (s.ok()) {
diff --git a/src/kudu/fs/fs_manager-test.cc b/src/kudu/fs/fs_manager-test.cc
index 7432ade..59beb4c 100644
--- a/src/kudu/fs/fs_manager-test.cc
+++ b/src/kudu/fs/fs_manager-test.cc
@@ -992,7 +992,7 @@ TEST_F(FsManagerTestBase, TestAddRemoveDataDirsFuzz) {
         string new_instance = fs_manager()->GetInstanceMetadataPath(fs_root);
         if (!env_->FileExists(new_instance)) {
           WritableFileOptions wr_opts;
-          wr_opts.mode = Env::CREATE_NON_EXISTING;
+          wr_opts.mode = Env::MUST_CREATE;
           ASSERT_OK(env_util::CopyFile(env_, source_instance, new_instance, wr_opts));
           ReinitFsManagerWithOpts(fs_opts);
           open_status = fs_manager()->Open();
diff --git a/src/kudu/fs/log_block_manager-test-util.cc b/src/kudu/fs/log_block_manager-test-util.cc
index ba679ee..b308a46 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -123,7 +123,7 @@ Status LBMCorruptor::PreallocateFullContainer() {
 
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   int64_t initial_size;
   RETURN_NOT_OK(PreallocateForBlock(data_file.get(), mode,
@@ -153,7 +153,7 @@ Status LBMCorruptor::AddUnpunchedBlockToFullContainer() {
   // Must be non-zero length, otherwise preallocation will fail.
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   int64_t block_length = (rand_.Uniform(16) + 1) * fs_block_size;
   int64_t initial_data_size;
@@ -227,7 +227,7 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
   {
     unique_ptr<RWFile> data_file;
     RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
+    opts.mode = Env::MUST_EXIST;
     RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
     RETURN_NOT_OK(PreallocateForBlock(data_file.get(), RWFile::CHANGE_FILE_SIZE,
                                       kBlockSize, &initial_data_size));
@@ -292,7 +292,7 @@ Status LBMCorruptor::AddMisalignedBlockToContainer() {
 
   unique_ptr<RWFile> data_file;
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
   uint64_t initial_data_size;
   RETURN_NOT_OK(data_file->Size(&initial_data_size));
@@ -360,7 +360,7 @@ Status LBMCorruptor::AddPartialRecordToContainer() {
   // Corrupt the record by truncating one byte off the end of it.
   {
     RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
+    opts.mode = Env::MUST_EXIST;
     unique_ptr<RWFile> metadata_file;
     RETURN_NOT_OK(env_->NewRWFile(opts, c->metadata_filename, &metadata_file));
     uint64_t initial_metadata_size;
@@ -422,7 +422,7 @@ Status LBMCorruptor::OpenMetadataWriter(
     const Container& container,
     unique_ptr<WritablePBContainerFile>* writer) {
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   unique_ptr<RWFile> metadata_file;
   RETURN_NOT_OK(env_->NewRWFile(opts,
                                 container.metadata_filename,
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index 2804d82..01952c8 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -671,7 +671,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   for (const auto num_bytes : {1, 8, 128}) {
     {
       RWFileOptions opts;
-      opts.mode = Env::OPEN_EXISTING;
+      opts.mode = Env::MUST_EXIST;
       unique_ptr<RWFile> file;
       ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
       ASSERT_OK(file->Truncate(good_meta_size + num_bytes));
@@ -729,7 +729,7 @@ TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
   // data loss, however it will look like a failed partial write.
   {
     RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
+    opts.mode = Env::MUST_EXIST;
     unique_ptr<RWFile> file;
     ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
     ASSERT_OK(file->Truncate(good_meta_size - 1));
@@ -867,7 +867,7 @@ TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) {
     LOG(INFO) << "Pass " << mode;
     unique_ptr<RWFile> data_file;
     RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
+    opts.mode = Env::MUST_EXIST;
     ASSERT_OK(env_->NewRWFile(opts, fname, &data_file));
     ASSERT_OK(data_file->PreAllocate(size_after_close, size_after_close, mode));
     uint64_t size_after_preallocate;
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 585efe6..dff3870 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -749,7 +749,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
   unique_ptr<RWFile> metadata_writer;
   unique_ptr<RWFile> data_file;
   RWFileOptions wr_opts;
-  wr_opts.mode = Env::CREATE_NON_EXISTING;
+  wr_opts.mode = Env::MUST_CREATE;
 
   // Repeat in the event of a container id collision (unlikely).
   //
@@ -769,7 +769,7 @@ Status LogBlockContainer::Create(LogBlockManager* block_manager,
     }
     data_path = StrCat(common_path, LogBlockManager::kContainerDataFileSuffix);
     RWFileOptions rw_opts;
-    rw_opts.mode = Env::CREATE_NON_EXISTING;
+    rw_opts.mode = Env::MUST_CREATE;
     data_status = block_manager->env()->NewRWFile(rw_opts,
                                                   data_path,
                                                   &data_file);
@@ -2729,7 +2729,7 @@ Status LogBlockManager::Repair(
     for (auto& pr : report->partial_record_check->entries) {
       unique_ptr<RWFile> file;
       RWFileOptions opts;
-      opts.mode = Env::OPEN_EXISTING;
+      opts.mode = Env::MUST_EXIST;
       LogBlockContainerRefPtr container = FindPtrOrNull(containers_by_name, pr.container);
       if (!container) {
         // The container was deleted outright.
diff --git a/src/kudu/integration-tests/dense_node-itest.cc b/src/kudu/integration-tests/dense_node-itest.cc
index e177d8d..44210d8 100644
--- a/src/kudu/integration-tests/dense_node-itest.cc
+++ b/src/kudu/integration-tests/dense_node-itest.cc
@@ -203,7 +203,7 @@ TEST_F(DenseNodeTest, RunTest) {
     LOG_TIMING(INFO, "dropping kernel caches") {
       unique_ptr<WritableFile> f;
       WritableFileOptions opts;
-      opts.mode = Env::OPEN_EXISTING;
+      opts.mode = Env::MUST_EXIST;
       ASSERT_OK(env_->NewWritableFile(opts, "/proc/sys/vm/drop_caches", &f));
       ASSERT_OK(f->Append("3\n"));
       ASSERT_OK(f->Close());
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 97d9312..6f3a760 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -251,7 +251,7 @@ Status BackupConsensusMetadata(FsManager* fs_manager,
   string cmeta_filename = fs_manager->GetConsensusMetadataPath(tablet_id);
   string backup_filename = Substitute("$0.pre_rewrite.$1", cmeta_filename, env->NowMicros());
   WritableFileOptions opts;
-  opts.mode = Env::CREATE_NON_EXISTING;
+  opts.mode = Env::MUST_CREATE;
   opts.sync_on_close = true;
   RETURN_NOT_OK(env_util::CopyFile(env, cmeta_filename, backup_filename, opts));
   LOG(INFO) << "Backed up old consensus metadata to " << backup_filename;
diff --git a/src/kudu/util/env-test.cc b/src/kudu/util/env-test.cc
index e668461..49556ef 100644
--- a/src/kudu/util/env-test.cc
+++ b/src/kudu/util/env-test.cc
@@ -612,7 +612,7 @@ TEST_F(TestEnv, TestOverwrite) {
 
   // File exists, try to overwrite (and fail).
   WritableFileOptions opts;
-  opts.mode = Env::CREATE_NON_EXISTING;
+  opts.mode = Env::MUST_CREATE;
   Status s = env_util::OpenFileForWrite(opts,
                                         env_, test_path, &writer);
   ASSERT_TRUE(s.IsAlreadyPresent());
@@ -634,7 +634,7 @@ TEST_F(TestEnv, TestReopen) {
 
   // Reopen it and append to it.
   WritableFileOptions reopen_opts;
-  reopen_opts.mode = Env::OPEN_EXISTING;
+  reopen_opts.mode = Env::MUST_EXIST;
   ASSERT_OK(env_util::OpenFileForWrite(reopen_opts,
                                        env_, test_path, &writer));
   ASSERT_EQ(first.length(), writer->Size());
@@ -931,16 +931,31 @@ TEST_F(TestEnv, TestRWFile) {
 
   // Make sure we can't overwrite it.
   RWFileOptions opts;
-  opts.mode = Env::CREATE_NON_EXISTING;
+  opts.mode = Env::MUST_CREATE;
   ASSERT_TRUE(env_->NewRWFile(opts, GetTestPath("foo"), &file).IsAlreadyPresent());
 
   // Reopen it without truncating the existing data.
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   ASSERT_OK(env_->NewRWFile(opts, GetTestPath("foo"), &file));
   uint8_t scratch4[kNewTestData.length()];
   Slice result4(scratch4, kNewTestData.length());
   ASSERT_OK(file->Read(0, result4));
   ASSERT_EQ(result4, kNewTestData);
+
+  // Test CREATE_OR_OPEN semantics on a new file.
+  const string bar_path = GetTestPath("bar");
+  unique_ptr<RWFile> file_two;
+  opts.mode = Env::CREATE_OR_OPEN;
+  ASSERT_FALSE(env_->FileExists(bar_path));
+  ASSERT_OK(env_->NewRWFile(opts, bar_path, &file_two));
+  ASSERT_TRUE(env_->FileExists(bar_path));
+  ASSERT_OK(file_two->Write(0, kTestData));
+  ASSERT_OK(file_two->Size(&sz));
+  ASSERT_EQ(kTestData.length(), sz);
+
+  ASSERT_OK(env_->NewRWFile(opts, bar_path, &file_two));
+  ASSERT_OK(file_two->Size(&sz));
+  ASSERT_EQ(kTestData.length(), sz);
 }
 
 TEST_F(TestEnv, TestCanonicalize) {
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index 88c1c0d..38d0356 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -52,15 +52,17 @@ class Env {
  public:
   // Governs if/how the file is created.
   //
-  // enum value                      | file exists       | file does not exist
-  // --------------------------------+-------------------+--------------------
-  // CREATE_IF_NON_EXISTING_TRUNCATE | opens + truncates | creates
-  // CREATE_NON_EXISTING             | fails             | creates
-  // OPEN_EXISTING                   | opens             | fails
-  enum CreateMode {
-    CREATE_IF_NON_EXISTING_TRUNCATE,
-    CREATE_NON_EXISTING,
-    OPEN_EXISTING
+  // enum value                   | file exists       | file does not exist
+  // -----------------------------+-------------------+--------------------
+  // CREATE_OR_OPEN_WITH_TRUNCATE | opens + truncates | creates
+  // CREATE_OR_OPEN               | opens             | creates
+  // MUST_CREATE                  | fails             | creates
+  // MUST_EXIST                   | opens             | fails
+  enum OpenMode {
+    CREATE_OR_OPEN_WITH_TRUNCATE,
+    CREATE_OR_OPEN,
+    MUST_CREATE,
+    MUST_EXIST
   };
 
   Env() { }
@@ -452,11 +454,11 @@ struct WritableFileOptions {
   bool sync_on_close;
 
   // See CreateMode for details.
-  Env::CreateMode mode;
+  Env::OpenMode mode;
 
   WritableFileOptions()
     : sync_on_close(false),
-      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+      mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE) { }
 };
 
 // Options specified when a file is opened for random access.
@@ -525,11 +527,11 @@ struct RWFileOptions {
   bool sync_on_close;
 
   // See CreateMode for details.
-  Env::CreateMode mode;
+  Env::OpenMode mode;
 
   RWFileOptions()
     : sync_on_close(false),
-      mode(Env::CREATE_IF_NON_EXISTING_TRUNCATE) { }
+      mode(Env::CREATE_OR_OPEN_WITH_TRUNCATE) { }
 };
 
 // A file abstraction for both reading and writing. No notion of a built-in
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 050dcc5..05a6ed3 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -340,18 +340,21 @@ Status DoSync(int fd, const string& filename) {
   return Status::OK();
 }
 
-Status DoOpen(const string& filename, Env::CreateMode mode, int* fd) {
+Status DoOpen(const string& filename, Env::OpenMode mode, int* fd) {
   MAYBE_RETURN_EIO(filename, IOError(Env::kInjectedFailureStatusMsg, EIO));
   ThreadRestrictions::AssertIOAllowed();
   int flags = O_RDWR;
   switch (mode) {
-    case Env::CREATE_IF_NON_EXISTING_TRUNCATE:
+    case Env::CREATE_OR_OPEN_WITH_TRUNCATE:
       flags |= O_CREAT | O_TRUNC;
       break;
-    case Env::CREATE_NON_EXISTING:
+    case Env::CREATE_OR_OPEN:
+      flags |= O_CREAT;
+      break;
+    case Env::MUST_CREATE:
       flags |= O_CREAT | O_EXCL;
       break;
-    case Env::OPEN_EXISTING:
+    case Env::MUST_EXIST:
       break;
     default:
       return Status::NotSupported(Substitute("Unknown create mode $0", mode));
@@ -1787,7 +1790,7 @@ class PosixEnv : public Env {
                                     const WritableFileOptions& opts,
                                     unique_ptr<WritableFile>* result) {
     uint64_t file_size = 0;
-    if (opts.mode == OPEN_EXISTING) {
+    if (opts.mode == MUST_EXIST) {
       RETURN_NOT_OK(GetFileSize(fname, &file_size));
     }
     result->reset(new PosixWritableFile(fname, fd, file_size, opts.sync_on_close));
diff --git a/src/kudu/util/file_cache.cc b/src/kudu/util/file_cache.cc
index ddb9993..c5ed6f4 100644
--- a/src/kudu/util/file_cache.cc
+++ b/src/kudu/util/file_cache.cc
@@ -330,7 +330,7 @@ class Descriptor<RWFile> : public RWFile {
 
     // The file was evicted, reopen it.
     RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
+    opts.mode = Env::MUST_EXIST;
     unique_ptr<RWFile> f;
     RETURN_NOT_OK(base_.env()->NewRWFile(opts, base_.filename(), &f));
 
diff --git a/src/kudu/util/pb_util-test.cc b/src/kudu/util/pb_util-test.cc
index a942e9a..ec873bf 100644
--- a/src/kudu/util/pb_util-test.cc
+++ b/src/kudu/util/pb_util-test.cc
@@ -177,7 +177,7 @@ Status TestPBUtil::BitFlipFileByteRange(const string& path, uint64_t offset, uin
 Status TestPBUtil::TruncateFile(const string& path, uint64_t size) {
   unique_ptr<RWFile> file;
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   RETURN_NOT_OK(env_->NewRWFile(opts, path, &file));
   RETURN_NOT_OK(file->Truncate(size));
   return Status::OK();
@@ -406,7 +406,7 @@ TEST_P(TestPBContainerVersions, TestAppendAfterPartialWrite) {
 
   unique_ptr<WritablePBContainerFile> writer;
   RWFileOptions opts;
-  opts.mode = Env::OPEN_EXISTING;
+  opts.mode = Env::MUST_EXIST;
   ASSERT_OK(NewPBCWriter(version_, opts, &writer));
   ASSERT_OK(writer->OpenExisting());
 
diff --git a/src/kudu/util/rolling_log.cc b/src/kudu/util/rolling_log.cc
index 50f9fbd..610aa9e 100644
--- a/src/kudu/util/rolling_log.cc
+++ b/src/kudu/util/rolling_log.cc
@@ -157,7 +157,7 @@ Status RollingLog::Open() {
     WritableFileOptions opts;
     // Logs aren't worth the performance cost of durability.
     opts.sync_on_close = false;
-    opts.mode = Env::CREATE_NON_EXISTING;
+    opts.mode = Env::MUST_CREATE;
 
     RETURN_NOT_OK(env_->NewWritableFile(opts, path, &file_));
 


[kudu] 02/02: log_index: use RWFiles for IO

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5cc9114cc757e84f51fa5ad4e263fbb7e8f9fe18
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Dec 3 15:57:00 2019 -0800

    log_index: use RWFiles for IO
    
    To use LogIndex in FileCache, we need to do one of two things:
    1. Add an mmap-based file abstraction to Env, to be used by LogIndex.
    2. Rework LogIndex to use RWFile instead of memory mappings.
    
    This patch implements option #2. Why? Because although memory mappings can
    be used for zero copy IO, the LogIndex wasn't doing that, and more
    importantly, failures during memory mapped IO are communicated via UNIX
    signals, making it practically impossible for an application of Kudu's
    complexity to recover from a WAL disk failure surfaced during log index IO,
    a feature that is being actively worked on in KUDU-2975.
    
    IO through mmap is identical to IO through RWFile (i.e. pwrite/pread) for
    all other intents and purposes:
    - Both can use ftruncate to grow the file's size while keeping it sparse.
    - Both maintain holes in file sections that aren't written.
    - Both go through the page cache for reads and writes.
    - Both allow pages to be dirty before writing them out asynchronously.
    
    Change-Id: I75c0476bbd9be55657291c85488b9121e04a91de
    Reviewed-on: http://gerrit.cloudera.org:8080/14822
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log-test.cc       |   2 +-
 src/kudu/consensus/log.cc            |   2 +-
 src/kudu/consensus/log_index-test.cc |   2 +-
 src/kudu/consensus/log_index.cc      | 129 ++++++++++++++---------------------
 src/kudu/consensus/log_index.h       |  33 +++++----
 5 files changed, 72 insertions(+), 96 deletions(-)

diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 8ba36f1..67c07de 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -347,7 +347,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
   // because it has a cached header.
   shared_ptr<LogReader> reader;
   ASSERT_OK(LogReader::Open(fs_manager_.get(),
-                            make_scoped_refptr(new LogIndex(log_->log_dir_)),
+                            make_scoped_refptr(new LogIndex(env_, log_->log_dir_)),
                             kTestTablet, nullptr, &reader));
   ASSERT_EQ(1, reader->num_segments());
 
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 6665b38..49cea4e 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -783,7 +783,7 @@ Status Log::Init() {
   CHECK_EQ(kLogInitialized, log_state_);
 
   // Init the index
-  log_index_.reset(new LogIndex(ctx_.log_dir));
+  log_index_.reset(new LogIndex(ctx_.fs_manager->env(), ctx_.log_dir));
 
   // Reader for previous segments.
   RETURN_NOT_OK(LogReader::Open(ctx_.fs_manager,
diff --git a/src/kudu/consensus/log_index-test.cc b/src/kudu/consensus/log_index-test.cc
index 310e329..b9eda61 100644
--- a/src/kudu/consensus/log_index-test.cc
+++ b/src/kudu/consensus/log_index-test.cc
@@ -39,7 +39,7 @@ class LogIndexTest : public KuduTest {
  public:
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
-    index_ = new LogIndex(test_dir_);
+    index_ = new LogIndex(env_, test_dir_);
   }
 
  protected:
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index bfeea72..743a297 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -17,25 +17,20 @@
 
 // The implementation of the Log Index.
 //
-// The log index is implemented by a set of on-disk files, each containing a fixed number
-// (kEntriesPerIndexChunk) of fixed size entries. Each index chunk is numbered such that,
-// for a given log index, we can determine which chunk contains its index entry by a
-// simple division operation. Because the entries are fixed size, we can compute the
-// index offset by a modulo.
+// The log index is implemented by a set of on-disk files, each containing a
+// fixed number (kEntriesPerIndexChunk) of fixed size entries. Each index chunk
+// is numbered such that, for a given log index, we can determine which chunk
+// contains its index entry by a simple division operation. Because the entries
+// are fixed size, we can compute the index offset by a modulo.
 //
-// When the log is GCed, we remove any index chunks which are no longer needed, and
-// unmap them.
+// When the log is GCed, we remove any index chunks which are no longer needed,
+// and close them.
 
 #include "kudu/consensus/log_index.h"
 
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <unistd.h>
-
-#include <cerrno>
 #include <cinttypes>
 #include <cstdint>
-#include <cstring>
+#include <memory>
 #include <mutex>
 #include <ostream>
 #include <string>
@@ -49,9 +44,11 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/errno.h"
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
 
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -59,8 +56,10 @@ namespace kudu {
 namespace log {
 
 // The actual physical entry in the file.
-// This mirrors LogIndexEntry but uses simple primitives only so we can
-// read/write it via mmap.
+//
+// This is semantically equivalent to LogIndexEntry but uses simple primitives
+// only so we can ensure it has a constant size.
+//
 // See LogIndexEntry for docs.
 struct PhysicalEntry {
   int64_t term;
@@ -76,87 +75,59 @@ static const int64_t kChunkFileSize = kEntriesPerIndexChunk * sizeof(PhysicalEnt
 ////////////////////////////////////////////////////////////
 
 // A single chunk of the index, representing a fixed number of entries.
-// This class maintains the open file descriptor and mapped memory.
+//
+// This class maintains the open file handle.
 class LogIndex::IndexChunk : public RefCountedThreadSafe<LogIndex::IndexChunk> {
  public:
-  explicit IndexChunk(string path);
-  ~IndexChunk();
+  IndexChunk(Env* env, string path);
+  ~IndexChunk() = default;
 
-  // Open and map the memory.
   Status Open();
-  void GetEntry(int entry_index, PhysicalEntry* ret);
-  void SetEntry(int entry_index, const PhysicalEntry& entry);
+  Status GetEntry(int entry_index, PhysicalEntry* ret) const;
+  Status SetEntry(int entry_index, const PhysicalEntry& entry);
 
  private:
+  Env* env_;
   const string path_;
-  int fd_;
-  uint8_t* mapping_;
+  unique_ptr<RWFile> file_;
 };
 
-namespace  {
-Status CheckError(int rc, const char* operation) {
-  if (PREDICT_FALSE(rc < 0)) {
-    int err = errno;
-    return Status::IOError(operation, ErrnoToString(err), err);
-  }
-  return Status::OK();
-}
-} // anonymous namespace
-
-LogIndex::IndexChunk::IndexChunk(std::string path)
-    : path_(std::move(path)), fd_(-1), mapping_(nullptr) {}
-
-LogIndex::IndexChunk::~IndexChunk() {
-  if (mapping_ != nullptr) {
-    munmap(mapping_, kChunkFileSize);
-  }
-
-  if (fd_ >= 0) {
-    int ret;
-    RETRY_ON_EINTR(ret, close(fd_));
-    if (PREDICT_FALSE(ret != 0)) {
-      PLOG(WARNING) << "Failed to close fd " << fd_;
-    }
-  }
+LogIndex::IndexChunk::IndexChunk(Env* env, string path)
+    : env_(env),
+      path_(std::move(path)) {
 }
 
 Status LogIndex::IndexChunk::Open() {
-  RETRY_ON_EINTR(fd_, open(path_.c_str(), O_CLOEXEC | O_CREAT | O_RDWR, 0666));
-  RETURN_NOT_OK(CheckError(fd_, "open"));
-
-  int err;
-  RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize));
-  RETURN_NOT_OK(CheckError(err, "truncate"));
-
-  mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE,
-                                        MAP_SHARED, fd_, 0));
-  if (mapping_ == nullptr) {
-    err = errno;
-    return Status::IOError("Unable to mmap()", ErrnoToString(err), err);
-  }
-
-  return Status::OK();
+  RWFileOptions opts;
+  opts.mode = Env::CREATE_OR_OPEN;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path_, &file_));
+  return file_->Truncate(kChunkFileSize);
 }
 
-void LogIndex::IndexChunk::GetEntry(int entry_index, PhysicalEntry* ret) {
-  DCHECK_GE(fd_, 0) << "Must Open() first";
+Status LogIndex::IndexChunk::GetEntry(int entry_index, PhysicalEntry* ret) const {
+  DCHECK(file_) << "Must Open() first";
   DCHECK_LT(entry_index, kEntriesPerIndexChunk);
 
-  memcpy(ret, mapping_ + sizeof(PhysicalEntry) * entry_index, sizeof(PhysicalEntry));
+  Slice s(reinterpret_cast<const uint8_t*>(ret), sizeof(PhysicalEntry));
+  return file_->Read(sizeof(PhysicalEntry) * entry_index, s);
 }
 
-void LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& entry) {
-  DCHECK_GE(fd_, 0) << "Must Open() first";
+Status LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& entry) {
+  DCHECK(file_) << "Must Open() first";
   DCHECK_LT(entry_index, kEntriesPerIndexChunk);
 
-  memcpy(mapping_ + sizeof(PhysicalEntry) * entry_index, &entry, sizeof(PhysicalEntry));
+  Slice s(reinterpret_cast<const uint8_t*>(&entry), sizeof(PhysicalEntry));
+  return file_->Write(sizeof(PhysicalEntry) * entry_index, s);
 }
 
 ////////////////////////////////////////////////////////////
 // LogIndex
 ////////////////////////////////////////////////////////////
 
-LogIndex::LogIndex(std::string base_dir) : base_dir_(std::move(base_dir)) {}
+LogIndex::LogIndex(Env* env, string base_dir)
+    : env_(env),
+      base_dir_(std::move(base_dir)) {
+}
 
 LogIndex::~LogIndex() {
 }
@@ -168,9 +139,9 @@ string LogIndex::GetChunkPath(int64_t chunk_idx) {
 Status LogIndex::OpenChunk(int64_t chunk_idx, scoped_refptr<IndexChunk>* chunk) {
   string path = GetChunkPath(chunk_idx);
 
-  scoped_refptr<IndexChunk> new_chunk(new IndexChunk(path));
+  scoped_refptr<IndexChunk> new_chunk(new IndexChunk(env_, path));
   RETURN_NOT_OK(new_chunk->Open());
-  chunk->swap(new_chunk);
+  *chunk = std::move(new_chunk);
   return Status::OK();
 }
 
@@ -219,7 +190,7 @@ Status LogIndex::AddEntry(const LogIndexEntry& entry) {
   phys.segment_sequence_number = entry.segment_sequence_number;
   phys.offset_in_segment = entry.offset_in_segment;
 
-  chunk->SetEntry(index_in_chunk, phys);
+  RETURN_NOT_OK(chunk->SetEntry(index_in_chunk, phys));
   VLOG(3) << "Added log index entry " << entry.ToString();
 
   return Status::OK();
@@ -230,7 +201,7 @@ Status LogIndex::GetEntry(int64_t index, LogIndexEntry* entry) {
   RETURN_NOT_OK(GetChunkForIndex(index, false /* do not create */, &chunk));
   int index_in_chunk = index % kEntriesPerIndexChunk;
   PhysicalEntry phys;
-  chunk->GetEntry(index_in_chunk, &phys);
+  RETURN_NOT_OK(chunk->GetEntry(index_in_chunk, &phys));
 
   // We never write any real entries to offset 0, because there's a header
   // in each log segment. So, this indicates an entry that was never written.
@@ -261,12 +232,12 @@ void LogIndex::GC(int64_t min_index_to_retain) {
   // Outside of the lock, try to delete them (avoid holding the lock during IO).
   for (int64_t chunk_idx : chunks_to_delete) {
     string path = GetChunkPath(chunk_idx);
-    int rc = unlink(path.c_str());
-    if (rc != 0) {
-      PLOG(WARNING) << "Unable to delete index chunk " << path;
+    Status s = env_->DeleteFile(path);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("Unable to delete index chunk $0: $1", path, s.ToString());
       continue;
     }
-    LOG(INFO) << "Deleted log index segment " << path;
+    LOG(INFO) << "Deleted log index chunk " << path;
     {
       std::lock_guard<simple_spinlock> l(open_chunks_lock_);
       open_chunks_.erase(chunk_idx);
diff --git a/src/kudu/consensus/log_index.h b/src/kudu/consensus/log_index.h
index 1f9ce31..44f4734 100644
--- a/src/kudu/consensus/log_index.h
+++ b/src/kudu/consensus/log_index.h
@@ -14,12 +14,11 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CONSENSUS_LOG_INDEX_H
-#define KUDU_CONSENSUS_LOG_INDEX_H
+#pragma once
 
 #include <cstdint>
-#include <string>
 #include <map>
+#include <string>
 
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/macros.h"
@@ -28,6 +27,9 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+class Env;
+
 namespace log {
 
 // An entry in the index.
@@ -45,21 +47,22 @@ struct LogIndexEntry {
   std::string ToString() const;
 };
 
-// An on-disk structure which indexes from OpId index to the specific position in the WAL
-// which contains the latest ReplicateMsg for that index.
+// An on-disk structure which indexes from OpId index to the specific position
+// in the WAL which contains the latest ReplicateMsg for that index.
 //
-// This structure is on-disk but *not durable*. We use mmap()ed IO to write it out, and
-// never sync it to disk. Its only purpose is to allow random-reading earlier entries from
-// the log to serve to Raft followers.
+// This structure is on-disk but *not durable* (i.e. it is never synced to
+// disk). Its only purpose is to allow random-reading earlier entries from the
+// log to serve to Raft followers.
 //
-// This class is thread-safe, but doesn't provide a memory barrier between writers and
-// readers. In other words, if a reader is expected to see an index entry written by a
-// writer, there should be some other synchronization between them to ensure visibility.
+// This class is thread-safe, but doesn't provide a memory barrier between
+// writers and readers. In other words, if a reader is expected to see an index
+// entry written by a writer, there should be some other synchronization between
+// them to ensure visibility.
 //
 // See .cc file for implementation notes.
 class LogIndex : public RefCountedThreadSafe<LogIndex> {
  public:
-  explicit LogIndex(std::string base_dir);
+  LogIndex(Env* env, std::string base_dir);
 
   // Record an index entry in the index.
   Status AddEntry(const LogIndexEntry& entry);
@@ -92,6 +95,9 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
   // Return the path of the given index chunk.
   std::string GetChunkPath(int64_t chunk_idx);
 
+  // Environment with which to do file I/O.
+  Env* env_;
+
   // The base directory where index files are located.
   const std::string base_dir_;
 
@@ -100,7 +106,7 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
   // Map from chunk index to IndexChunk. The chunk index is the log index modulo
   // the number of entries per chunk (see docs in log_index.cc).
   // Protected by open_chunks_lock_
-  typedef std::map<int64_t, scoped_refptr<IndexChunk> > ChunkMap;
+  typedef std::map<int64_t, scoped_refptr<IndexChunk>> ChunkMap;
   ChunkMap open_chunks_;
 
   DISALLOW_COPY_AND_ASSIGN(LogIndex);
@@ -108,4 +114,3 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
 
 } // namespace log
 } // namespace kudu
-#endif /* KUDU_CONSENSUS_LOG_INDEX_H */