You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/12/04 19:23:39 UTC

[kudu] 02/02: log_index: use RWFiles for IO

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5cc9114cc757e84f51fa5ad4e263fbb7e8f9fe18
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Tue Dec 3 15:57:00 2019 -0800

    log_index: use RWFiles for IO
    
    To use LogIndex in FileCache, we need to do one of two things:
    1. Add an mmap-based file abstraction to Env, to be used by LogIndex.
    2. Rework LogIndex to use RWFile instead of memory mappings.
    
    This patch implements option #2. Why? Because although memory mappings can
    be used for zero copy IO, the LogIndex wasn't doing that, and more
    importantly, failures during memory mapped IO are communicated via UNIX
    signals, making it practically impossible for an application of Kudu's
    complexity to recover from a WAL disk failure surfaced during log index IO,
    a feature that is being actively worked on in KUDU-2975.
    
    IO through mmap is identical to IO through RWFile (i.e. pwrite/pread) for
    all other intents and purposes:
    - Both can use ftruncate to grow the file's size while keeping it sparse.
    - Both maintain holes in file sections that aren't written.
    - Both go through the page cache for reads and writes.
    - Both allow pages to be dirty before writing them out asynchronously.
    
    Change-Id: I75c0476bbd9be55657291c85488b9121e04a91de
    Reviewed-on: http://gerrit.cloudera.org:8080/14822
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log-test.cc       |   2 +-
 src/kudu/consensus/log.cc            |   2 +-
 src/kudu/consensus/log_index-test.cc |   2 +-
 src/kudu/consensus/log_index.cc      | 129 ++++++++++++++---------------------
 src/kudu/consensus/log_index.h       |  33 +++++----
 5 files changed, 72 insertions(+), 96 deletions(-)

diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index 8ba36f1..67c07de 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -347,7 +347,7 @@ void LogTest::DoCorruptionTest(CorruptionType type, CorruptionPosition place,
   // because it has a cached header.
   shared_ptr<LogReader> reader;
   ASSERT_OK(LogReader::Open(fs_manager_.get(),
-                            make_scoped_refptr(new LogIndex(log_->log_dir_)),
+                            make_scoped_refptr(new LogIndex(env_, log_->log_dir_)),
                             kTestTablet, nullptr, &reader));
   ASSERT_EQ(1, reader->num_segments());
 
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 6665b38..49cea4e 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -783,7 +783,7 @@ Status Log::Init() {
   CHECK_EQ(kLogInitialized, log_state_);
 
   // Init the index
-  log_index_.reset(new LogIndex(ctx_.log_dir));
+  log_index_.reset(new LogIndex(ctx_.fs_manager->env(), ctx_.log_dir));
 
   // Reader for previous segments.
   RETURN_NOT_OK(LogReader::Open(ctx_.fs_manager,
diff --git a/src/kudu/consensus/log_index-test.cc b/src/kudu/consensus/log_index-test.cc
index 310e329..b9eda61 100644
--- a/src/kudu/consensus/log_index-test.cc
+++ b/src/kudu/consensus/log_index-test.cc
@@ -39,7 +39,7 @@ class LogIndexTest : public KuduTest {
  public:
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
-    index_ = new LogIndex(test_dir_);
+    index_ = new LogIndex(env_, test_dir_);
   }
 
  protected:
diff --git a/src/kudu/consensus/log_index.cc b/src/kudu/consensus/log_index.cc
index bfeea72..743a297 100644
--- a/src/kudu/consensus/log_index.cc
+++ b/src/kudu/consensus/log_index.cc
@@ -17,25 +17,20 @@
 
 // The implementation of the Log Index.
 //
-// The log index is implemented by a set of on-disk files, each containing a fixed number
-// (kEntriesPerIndexChunk) of fixed size entries. Each index chunk is numbered such that,
-// for a given log index, we can determine which chunk contains its index entry by a
-// simple division operation. Because the entries are fixed size, we can compute the
-// index offset by a modulo.
+// The log index is implemented by a set of on-disk files, each containing a
+// fixed number (kEntriesPerIndexChunk) of fixed size entries. Each index chunk
+// is numbered such that, for a given log index, we can determine which chunk
+// contains its index entry by a simple division operation. Because the entries
+// are fixed size, we can compute the index offset by a modulo.
 //
-// When the log is GCed, we remove any index chunks which are no longer needed, and
-// unmap them.
+// When the log is GCed, we remove any index chunks which are no longer needed,
+// and close them.
 
 #include "kudu/consensus/log_index.h"
 
-#include <fcntl.h>
-#include <sys/mman.h>
-#include <unistd.h>
-
-#include <cerrno>
 #include <cinttypes>
 #include <cstdint>
-#include <cstring>
+#include <memory>
 #include <mutex>
 #include <ostream>
 #include <string>
@@ -49,9 +44,11 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/errno.h"
+#include "kudu/util/env.h"
+#include "kudu/util/slice.h"
 
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -59,8 +56,10 @@ namespace kudu {
 namespace log {
 
 // The actual physical entry in the file.
-// This mirrors LogIndexEntry but uses simple primitives only so we can
-// read/write it via mmap.
+//
+// This is semantically equivalent to LogIndexEntry but uses simple primitives
+// only so we can ensure it has a constant size.
+//
 // See LogIndexEntry for docs.
 struct PhysicalEntry {
   int64_t term;
@@ -76,87 +75,59 @@ static const int64_t kChunkFileSize = kEntriesPerIndexChunk * sizeof(PhysicalEnt
 ////////////////////////////////////////////////////////////
 
 // A single chunk of the index, representing a fixed number of entries.
-// This class maintains the open file descriptor and mapped memory.
+//
+// This class maintains the open file handle.
 class LogIndex::IndexChunk : public RefCountedThreadSafe<LogIndex::IndexChunk> {
  public:
-  explicit IndexChunk(string path);
-  ~IndexChunk();
+  IndexChunk(Env* env, string path);
+  ~IndexChunk() = default;
 
-  // Open and map the memory.
   Status Open();
-  void GetEntry(int entry_index, PhysicalEntry* ret);
-  void SetEntry(int entry_index, const PhysicalEntry& entry);
+  Status GetEntry(int entry_index, PhysicalEntry* ret) const;
+  Status SetEntry(int entry_index, const PhysicalEntry& entry);
 
  private:
+  Env* env_;
   const string path_;
-  int fd_;
-  uint8_t* mapping_;
+  unique_ptr<RWFile> file_;
 };
 
-namespace  {
-Status CheckError(int rc, const char* operation) {
-  if (PREDICT_FALSE(rc < 0)) {
-    int err = errno;
-    return Status::IOError(operation, ErrnoToString(err), err);
-  }
-  return Status::OK();
-}
-} // anonymous namespace
-
-LogIndex::IndexChunk::IndexChunk(std::string path)
-    : path_(std::move(path)), fd_(-1), mapping_(nullptr) {}
-
-LogIndex::IndexChunk::~IndexChunk() {
-  if (mapping_ != nullptr) {
-    munmap(mapping_, kChunkFileSize);
-  }
-
-  if (fd_ >= 0) {
-    int ret;
-    RETRY_ON_EINTR(ret, close(fd_));
-    if (PREDICT_FALSE(ret != 0)) {
-      PLOG(WARNING) << "Failed to close fd " << fd_;
-    }
-  }
+LogIndex::IndexChunk::IndexChunk(Env* env, string path)
+    : env_(env),
+      path_(std::move(path)) {
 }
 
 Status LogIndex::IndexChunk::Open() {
-  RETRY_ON_EINTR(fd_, open(path_.c_str(), O_CLOEXEC | O_CREAT | O_RDWR, 0666));
-  RETURN_NOT_OK(CheckError(fd_, "open"));
-
-  int err;
-  RETRY_ON_EINTR(err, ftruncate(fd_, kChunkFileSize));
-  RETURN_NOT_OK(CheckError(err, "truncate"));
-
-  mapping_ = static_cast<uint8_t*>(mmap(nullptr, kChunkFileSize, PROT_READ | PROT_WRITE,
-                                        MAP_SHARED, fd_, 0));
-  if (mapping_ == nullptr) {
-    err = errno;
-    return Status::IOError("Unable to mmap()", ErrnoToString(err), err);
-  }
-
-  return Status::OK();
+  RWFileOptions opts;
+  opts.mode = Env::CREATE_OR_OPEN;
+  RETURN_NOT_OK(env_->NewRWFile(opts, path_, &file_));
+  return file_->Truncate(kChunkFileSize);
 }
 
-void LogIndex::IndexChunk::GetEntry(int entry_index, PhysicalEntry* ret) {
-  DCHECK_GE(fd_, 0) << "Must Open() first";
+Status LogIndex::IndexChunk::GetEntry(int entry_index, PhysicalEntry* ret) const {
+  DCHECK(file_) << "Must Open() first";
   DCHECK_LT(entry_index, kEntriesPerIndexChunk);
 
-  memcpy(ret, mapping_ + sizeof(PhysicalEntry) * entry_index, sizeof(PhysicalEntry));
+  Slice s(reinterpret_cast<const uint8_t*>(ret), sizeof(PhysicalEntry));
+  return file_->Read(sizeof(PhysicalEntry) * entry_index, s);
 }
 
-void LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& entry) {
-  DCHECK_GE(fd_, 0) << "Must Open() first";
+Status LogIndex::IndexChunk::SetEntry(int entry_index, const PhysicalEntry& entry) {
+  DCHECK(file_) << "Must Open() first";
   DCHECK_LT(entry_index, kEntriesPerIndexChunk);
 
-  memcpy(mapping_ + sizeof(PhysicalEntry) * entry_index, &entry, sizeof(PhysicalEntry));
+  Slice s(reinterpret_cast<const uint8_t*>(&entry), sizeof(PhysicalEntry));
+  return file_->Write(sizeof(PhysicalEntry) * entry_index, s);
 }
 
 ////////////////////////////////////////////////////////////
 // LogIndex
 ////////////////////////////////////////////////////////////
 
-LogIndex::LogIndex(std::string base_dir) : base_dir_(std::move(base_dir)) {}
+LogIndex::LogIndex(Env* env, string base_dir)
+    : env_(env),
+      base_dir_(std::move(base_dir)) {
+}
 
 LogIndex::~LogIndex() {
 }
@@ -168,9 +139,9 @@ string LogIndex::GetChunkPath(int64_t chunk_idx) {
 Status LogIndex::OpenChunk(int64_t chunk_idx, scoped_refptr<IndexChunk>* chunk) {
   string path = GetChunkPath(chunk_idx);
 
-  scoped_refptr<IndexChunk> new_chunk(new IndexChunk(path));
+  scoped_refptr<IndexChunk> new_chunk(new IndexChunk(env_, path));
   RETURN_NOT_OK(new_chunk->Open());
-  chunk->swap(new_chunk);
+  *chunk = std::move(new_chunk);
   return Status::OK();
 }
 
@@ -219,7 +190,7 @@ Status LogIndex::AddEntry(const LogIndexEntry& entry) {
   phys.segment_sequence_number = entry.segment_sequence_number;
   phys.offset_in_segment = entry.offset_in_segment;
 
-  chunk->SetEntry(index_in_chunk, phys);
+  RETURN_NOT_OK(chunk->SetEntry(index_in_chunk, phys));
   VLOG(3) << "Added log index entry " << entry.ToString();
 
   return Status::OK();
@@ -230,7 +201,7 @@ Status LogIndex::GetEntry(int64_t index, LogIndexEntry* entry) {
   RETURN_NOT_OK(GetChunkForIndex(index, false /* do not create */, &chunk));
   int index_in_chunk = index % kEntriesPerIndexChunk;
   PhysicalEntry phys;
-  chunk->GetEntry(index_in_chunk, &phys);
+  RETURN_NOT_OK(chunk->GetEntry(index_in_chunk, &phys));
 
   // We never write any real entries to offset 0, because there's a header
   // in each log segment. So, this indicates an entry that was never written.
@@ -261,12 +232,12 @@ void LogIndex::GC(int64_t min_index_to_retain) {
   // Outside of the lock, try to delete them (avoid holding the lock during IO).
   for (int64_t chunk_idx : chunks_to_delete) {
     string path = GetChunkPath(chunk_idx);
-    int rc = unlink(path.c_str());
-    if (rc != 0) {
-      PLOG(WARNING) << "Unable to delete index chunk " << path;
+    Status s = env_->DeleteFile(path);
+    if (!s.ok()) {
+      LOG(WARNING) << Substitute("Unable to delete index chunk $0: $1", path, s.ToString());
       continue;
     }
-    LOG(INFO) << "Deleted log index segment " << path;
+    LOG(INFO) << "Deleted log index chunk " << path;
     {
       std::lock_guard<simple_spinlock> l(open_chunks_lock_);
       open_chunks_.erase(chunk_idx);
diff --git a/src/kudu/consensus/log_index.h b/src/kudu/consensus/log_index.h
index 1f9ce31..44f4734 100644
--- a/src/kudu/consensus/log_index.h
+++ b/src/kudu/consensus/log_index.h
@@ -14,12 +14,11 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CONSENSUS_LOG_INDEX_H
-#define KUDU_CONSENSUS_LOG_INDEX_H
+#pragma once
 
 #include <cstdint>
-#include <string>
 #include <map>
+#include <string>
 
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/macros.h"
@@ -28,6 +27,9 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
+class Env;
+
 namespace log {
 
 // An entry in the index.
@@ -45,21 +47,22 @@ struct LogIndexEntry {
   std::string ToString() const;
 };
 
-// An on-disk structure which indexes from OpId index to the specific position in the WAL
-// which contains the latest ReplicateMsg for that index.
+// An on-disk structure which indexes from OpId index to the specific position
+// in the WAL which contains the latest ReplicateMsg for that index.
 //
-// This structure is on-disk but *not durable*. We use mmap()ed IO to write it out, and
-// never sync it to disk. Its only purpose is to allow random-reading earlier entries from
-// the log to serve to Raft followers.
+// This structure is on-disk but *not durable* (i.e. it is never synced to
+// disk). Its only purpose is to allow random-reading earlier entries from the
+// log to serve to Raft followers.
 //
-// This class is thread-safe, but doesn't provide a memory barrier between writers and
-// readers. In other words, if a reader is expected to see an index entry written by a
-// writer, there should be some other synchronization between them to ensure visibility.
+// This class is thread-safe, but doesn't provide a memory barrier between
+// writers and readers. In other words, if a reader is expected to see an index
+// entry written by a writer, there should be some other synchronization between
+// them to ensure visibility.
 //
 // See .cc file for implementation notes.
 class LogIndex : public RefCountedThreadSafe<LogIndex> {
  public:
-  explicit LogIndex(std::string base_dir);
+  LogIndex(Env* env, std::string base_dir);
 
   // Record an index entry in the index.
   Status AddEntry(const LogIndexEntry& entry);
@@ -92,6 +95,9 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
   // Return the path of the given index chunk.
   std::string GetChunkPath(int64_t chunk_idx);
 
+  // Environment with which to do file I/O.
+  Env* env_;
+
   // The base directory where index files are located.
   const std::string base_dir_;
 
@@ -100,7 +106,7 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
   // Map from chunk index to IndexChunk. The chunk index is the log index modulo
   // the number of entries per chunk (see docs in log_index.cc).
   // Protected by open_chunks_lock_
-  typedef std::map<int64_t, scoped_refptr<IndexChunk> > ChunkMap;
+  typedef std::map<int64_t, scoped_refptr<IndexChunk>> ChunkMap;
   ChunkMap open_chunks_;
 
   DISALLOW_COPY_AND_ASSIGN(LogIndex);
@@ -108,4 +114,3 @@ class LogIndex : public RefCountedThreadSafe<LogIndex> {
 
 } // namespace log
 } // namespace kudu
-#endif /* KUDU_CONSENSUS_LOG_INDEX_H */