You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/11/19 19:30:34 UTC

[kudu] branch master updated (24cf1c9 -> 93ba778)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 24cf1c9  [cfile] No longer need to `relocate()` in `ReadBlock()`
     new 9c6d8dc  log: coalesce SegmentAllocator class
     new 93ba778  [mini-cluster] another fix on building mini-cluster JAR

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../mini-cluster/build_mini_cluster_binaries.sh    |  14 +-
 src/kudu/consensus/log.cc                          | 505 ++++++++++-----------
 src/kudu/consensus/log.h                           |   1 +
 3 files changed, 261 insertions(+), 259 deletions(-)


[kudu] 01/02: log: coalesce SegmentAllocator class

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9c6d8dc7b76fbe66acd4dae08196cd056ab466c1
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Mon Nov 18 00:36:13 2019 -0800

    log: coalesce SegmentAllocator class
    
    A previous patch introduced this class but separated it out, but kept
    it scattered throughout log.cc for reviewability. This patch puts all
    the methods closer together for readability.
    
    There are no functional changes in this patch.
    
    Change-Id: I61d898025c41ab00443eaad05c0b805e668e5095
    Reviewed-on: http://gerrit.cloudera.org:8080/14628
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/consensus/log.cc | 505 +++++++++++++++++++++++-----------------------
 src/kudu/consensus/log.h  |   1 +
 2 files changed, 253 insertions(+), 253 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 23aeaea..ae15638 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -454,10 +454,6 @@ SegmentAllocator::SegmentAllocator(const LogOptions* opts,
       schema_version_(schema_version),
       sync_disabled_(false) {}
 
-void SegmentAllocator::StopAllocationThread() {
-  allocation_pool_->Shutdown();
-}
-
 Status SegmentAllocator::Init(uint64_t sequence_number) {
   // Init the compression codec.
   if (!FLAGS_log_compression_codec.empty()) {
@@ -474,6 +470,131 @@ Status SegmentAllocator::Init(uint64_t sequence_number) {
   return AllocateSegmentAndRollOver();
 }
 
+Status SegmentAllocator::AllocateOrRollOverIfNecessary(uint32_t write_size_bytes) {
+  bool should_rollover = false;
+  // if the size of this entry overflows the current segment, get a new one
+  {
+    std::lock_guard<RWMutex> l(allocation_lock_);
+    if (allocation_state_ == kAllocationNotStarted) {
+      if ((active_segment_->Size() + write_size_bytes + 4) > max_segment_size_) {
+        VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
+        RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
+        if (!opts_->async_preallocate_segments) {
+          should_rollover = true;
+        }
+      }
+    } else if (allocation_state_ == kAllocationFinished) {
+      should_rollover = true;
+    } else {
+      DCHECK(opts_->async_preallocate_segments);
+      VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
+    }
+  }
+  if (should_rollover) {
+    TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
+    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
+      RETURN_NOT_OK(RollOver());
+    }
+  }
+  return Status::OK();
+}
+
+Status SegmentAllocator::Sync() {
+  TRACE_EVENT0("log", "Sync");
+  SCOPED_LATENCY_METRIC(ctx_->metrics, sync_latency);
+
+  if (PREDICT_FALSE(FLAGS_log_inject_latency && !sync_disabled_)) {
+    Random r(GetCurrentTimeMicros());
+    int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
+                            FLAGS_log_inject_latency_ms_stddev);
+    if (sleep_ms > 0) {
+      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
+                               << "ms of latency in SegmentAllocator::Sync()";
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+    }
+  }
+
+  if (opts_->force_fsync_all && !false) {
+    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
+      RETURN_NOT_OK(active_segment_->Sync());
+
+      if (hooks_) {
+        RETURN_NOT_OK_PREPEND(hooks_->PostSyncIfFsyncEnabled(),
+                              "PostSyncIfFsyncEnabled hook failed");
+      }
+    }
+  }
+
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PostSync(), "PostSync hook failed");
+  }
+  return Status::OK();
+}
+
+Status SegmentAllocator::CloseCurrentSegment() {
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
+  }
+  RETURN_NOT_OK(Sync());
+  if (!footer_.has_min_replicate_index()) {
+    VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
+                        << active_segment_->path();
+  }
+  VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
+                      << ": " << pb_util::SecureShortDebugString(footer_);
+
+  footer_.set_close_timestamp_micros(GetCurrentTimeMicros());
+  RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_));
+  if (hooks_) {
+    RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
+  }
+  return Status::OK();
+}
+
+void SegmentAllocator::UpdateFooterForBatch(const LogEntryBatch& batch) {
+  footer_.set_num_entries(footer_.num_entries() + batch.count());
+
+  // We keep track of the last-written OpId here.
+  // This is needed to initialize Consensus on startup.
+  // We also retrieve the opid of the first operation in the batch so that, if
+  // we roll over to a new segment, we set the first operation in the footer
+  // immediately.
+  if (batch.type_ == REPLICATE) {
+    // Update the index bounds for the current segment.
+    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
+      UpdateFooterForReplicateEntry(entry_pb, &footer_);
+    }
+  }
+}
+
+void SegmentAllocator::StopAllocationThread() {
+  allocation_pool_->Shutdown();
+}
+
+Status SegmentAllocator::AllocateSegmentAndRollOver() {
+  {
+    std::lock_guard<RWMutex> l(allocation_lock_);
+    RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
+  }
+  return RollOver();
+}
+
+Status SegmentAllocator::GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment) {
+  CHECK(active_segment_->IsClosed());
+  shared_ptr<RandomAccessFile> readable_file;
+  RETURN_NOT_OK(
+      OpenFileForRandom(ctx_->fs_manager->env(), active_segment_->path(), &readable_file));
+  scoped_refptr<ReadableLogSegment> segment(
+      new ReadableLogSegment(active_segment_->path(), readable_file));
+  // Note: segment->header() will only contain an initialized PB if we
+  // wrote the header out.
+  RETURN_NOT_OK(segment->Init(active_segment_->header(),
+                              active_segment_->footer(),
+                              active_segment_->first_entry_offset()));
+  *readable_segment = std::move(segment);
+  return Status::OK();
+}
+
 void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
                                                uint32_t version) {
   VLOG_WITH_PREFIX(2) << Substitute("Setting schema version $0 for next log segment $1",
@@ -483,10 +604,137 @@ void SegmentAllocator::SetSchemaForNextSegment(Schema schema,
   schema_version_ = version;
 }
 
+Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
+  allocation_lock_.AssertAcquiredForWriting();
+  DCHECK_EQ(kAllocationNotStarted, allocation_state_);
+  allocation_status_.Reset();
+  allocation_state_ = kAllocationInProgress;
+  return allocation_pool_->SubmitClosure(
+      Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
+}
+
 void SegmentAllocator::AllocationTask() {
   allocation_status_.Set(AllocateNewSegment());
 }
 
+Status SegmentAllocator::AllocateNewSegment() {
+  TRACE_EVENT1("log", "AllocateNewSegment", "file", next_segment_path_);
+  CHECK_EQ(kAllocationInProgress, allocation_state());
+
+  // We must mark allocation as finished when returning from this method.
+  auto alloc_finished = MakeScopedCleanup([&] () {
+    std::lock_guard<RWMutex> l(allocation_lock_);
+    allocation_state_ = kAllocationFinished;
+  });
+
+  WritableFileOptions opts;
+  opts.sync_on_close = opts_->force_fsync_all;
+  string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
+  string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
+  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
+  unique_ptr<WritableFile> segment_file;
+  Env* env = ctx_->fs_manager->env();
+  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(opts,
+                                         path_tmpl,
+                                         &next_segment_path_,
+                                         &segment_file), "ASDF");
+  next_segment_file_.reset(segment_file.release());
+  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
+
+  MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_preallocate_fraction,
+      Status::IOError("Injected IOError in SegmentAllocator::AllocateNewSegment()"));
+
+  if (opts_->preallocate_segments) {
+    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(env,
+                                                      next_segment_path_,
+                                                      max_segment_size_,
+                                                      FLAGS_fs_wal_dir_reserved_bytes));
+    // TODO (perf) zero the new segments -- this could result in
+    // additional performance improvements.
+    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_), "E");
+  }
+  return Status::OK();
+}
+
+Status SegmentAllocator::SwitchToAllocatedSegment() {
+  // Increment "next" log segment seqno.
+  active_segment_sequence_number++;
+  const auto& tablet_id = ctx_->tablet_id;
+  string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id,
+                                                                    active_segment_sequence_number);
+  Env* env = ctx_->fs_manager->env();
+  RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
+  if (opts_->force_fsync_all) {
+    RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
+  }
+
+  // Create a new segment in memory.
+  unique_ptr<WritableLogSegment> new_segment(
+      new WritableLogSegment(new_segment_path, std::move(next_segment_file_)));
+
+  // Set up the new header and footer.
+  LogSegmentHeaderPB header;
+  header.set_sequence_number(active_segment_sequence_number);
+  header.set_tablet_id(tablet_id);
+  if (codec_) {
+    header.set_compression_codec(codec_->type());
+  }
+
+  // Set up the new footer. This will be maintained as the segment is written.
+  footer_.Clear();
+  footer_.set_num_entries(0);
+
+  // Set the new segment's schema.
+  {
+    shared_lock<rw_spinlock> l(schema_lock_);
+    RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
+    header.set_schema_version(schema_version_);
+  }
+  RETURN_NOT_OK_PREPEND(new_segment->WriteHeaderAndOpen(header), "Failed to write header");
+
+  // Open the segment we just created in readable form and add it to the reader.
+  // TODO(todd): consider using a global FileCache here? With short log segments and
+  // lots of tablets, this file descriptor usage may add up.
+  unique_ptr<RandomAccessFile> readable_file;
+
+  RandomAccessFileOptions opts;
+  RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
+  scoped_refptr<ReadableLogSegment> readable_segment(
+    new ReadableLogSegment(new_segment_path,
+                           shared_ptr<RandomAccessFile>(readable_file.release())));
+  RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
+  RETURN_NOT_OK(reader_add_segment_(readable_segment));
+
+  // Now set 'active_segment_' to the new segment.
+  active_segment_ = std::move(new_segment);
+
+  std::lock_guard<RWMutex> l(allocation_lock_);
+  allocation_state_ = kAllocationNotStarted;
+  return Status::OK();
+}
+
+Status SegmentAllocator::RollOver() {
+  SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
+
+  // Wait for any on-going allocations to finish.
+  RETURN_NOT_OK(allocation_status_.Get());
+  DCHECK_EQ(kAllocationFinished, allocation_state());
+
+  // If this isn't the first active segment, close the segment and make it
+  // available to the log reader.
+  if (active_segment_) {
+    RETURN_NOT_OK(CloseCurrentSegment());
+    scoped_refptr<ReadableLogSegment> readable_segment;
+    RETURN_NOT_OK(GetClosedSegment(&readable_segment));
+    RETURN_NOT_OK(reader_replace_last_segment_(readable_segment));
+  }
+  RETURN_NOT_OK(SwitchToAllocatedSegment());
+
+  VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
+                      << active_segment_->path();
+  return Status::OK();
+}
+
 const Status Log::kLogShutdownStatus(
     Status::ServiceUnavailable("WAL is shutting down", "", ESHUTDOWN));
 
@@ -566,57 +814,6 @@ Status Log::Init() {
   return Status::OK();
 }
 
-Status SegmentAllocator::AsyncAllocateSegmentUnlocked() {
-  allocation_lock_.AssertAcquiredForWriting();
-  DCHECK_EQ(kAllocationNotStarted, allocation_state_);
-  allocation_status_.Reset();
-  allocation_state_ = kAllocationInProgress;
-  return allocation_pool_->SubmitClosure(
-      Bind(&SegmentAllocator::AllocationTask, Unretained(this)));
-}
-
-Status SegmentAllocator::CloseCurrentSegment() {
-  if (hooks_) {
-    RETURN_NOT_OK_PREPEND(hooks_->PreClose(), "PreClose hook failed");
-  }
-  RETURN_NOT_OK(Sync());
-  if (!footer_.has_min_replicate_index()) {
-    VLOG_WITH_PREFIX(1) << "Writing a segment without any REPLICATE message. Segment: "
-                        << active_segment_->path();
-  }
-  VLOG_WITH_PREFIX(2) << "Segment footer for " << active_segment_->path()
-                      << ": " << pb_util::SecureShortDebugString(footer_);
-
-  footer_.set_close_timestamp_micros(GetCurrentTimeMicros());
-  RETURN_NOT_OK(active_segment_->WriteFooterAndClose(footer_));
-  if (hooks_) {
-    RETURN_NOT_OK_PREPEND(hooks_->PostClose(), "PostClose hook failed");
-  }
-  return Status::OK();
-}
-
-Status SegmentAllocator::RollOver() {
-  SCOPED_LATENCY_METRIC(ctx_->metrics, roll_latency);
-
-  // Wait for any on-going allocations to finish.
-  RETURN_NOT_OK(allocation_status_.Get());
-  DCHECK_EQ(kAllocationFinished, allocation_state());
-
-  // If this isn't the first active segment, close the segment and make it
-  // available to the log reader.
-  if (active_segment_) {
-    RETURN_NOT_OK(CloseCurrentSegment());
-    scoped_refptr<ReadableLogSegment> readable_segment;
-    RETURN_NOT_OK(GetClosedSegment(&readable_segment));
-    RETURN_NOT_OK(reader_replace_last_segment_(readable_segment));
-  }
-  RETURN_NOT_OK(SwitchToAllocatedSegment());
-
-  VLOG_WITH_PREFIX(1) << "Rolled over to a new log segment at "
-                      << active_segment_->path();
-  return Status::OK();
-}
-
 Status Log::CreateBatchFromPB(LogEntryTypePB type,
                               unique_ptr<LogEntryBatchPB> entry_batch_pb,
                               unique_ptr<LogEntryBatch>* entry_batch) {
@@ -669,36 +866,6 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
   return Status::OK();
 }
 
-Status SegmentAllocator::AllocateOrRollOverIfNecessary(uint32_t write_size_bytes) {
-  bool should_rollover = false;
-  // if the size of this entry overflows the current segment, get a new one
-  {
-    std::lock_guard<RWMutex> l(allocation_lock_);
-    if (allocation_state_ == kAllocationNotStarted) {
-      if ((active_segment_->Size() + write_size_bytes + 4) > max_segment_size_) {
-        VLOG_WITH_PREFIX(1) << "Max segment size reached. Starting new segment allocation";
-        RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
-        if (!opts_->async_preallocate_segments) {
-          should_rollover = true;
-        }
-      }
-    } else if (allocation_state_ == kAllocationFinished) {
-      should_rollover = true;
-    } else {
-      DCHECK(opts_->async_preallocate_segments);
-      VLOG_WITH_PREFIX(1) << "Segment allocation already in progress...";
-    }
-  }
-  if (should_rollover) {
-    TRACE_COUNTER_SCOPE_LATENCY_US("log_roll");
-    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Log roll took a long time", LogPrefix())) {
-      RETURN_NOT_OK(RollOver());
-    }
-  }
-  return Status::OK();
-}
-
-
 Status Log::WriteBatch(LogEntryBatch* entry_batch) {
   size_t num_entries = entry_batch->count();
   DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
@@ -758,67 +925,11 @@ Status Log::UpdateIndexForBatch(const LogEntryBatch& batch,
   return Status::OK();
 }
 
-Status SegmentAllocator::AllocateSegmentAndRollOver() {
-  {
-    std::lock_guard<RWMutex> l(allocation_lock_);
-    RETURN_NOT_OK(AsyncAllocateSegmentUnlocked());
-  }
-  return RollOver();
-}
-
 Status Log::AllocateSegmentAndRollOver() {
   std::lock_guard<rw_spinlock> l(segment_idle_lock_);
   return segment_allocator_.AllocateSegmentAndRollOver();
 }
 
-void SegmentAllocator::UpdateFooterForBatch(const LogEntryBatch& batch) {
-  footer_.set_num_entries(footer_.num_entries() + batch.count());
-
-  // We keep track of the last-written OpId here.
-  // This is needed to initialize Consensus on startup.
-  // We also retrieve the opid of the first operation in the batch so that, if
-  // we roll over to a new segment, we set the first operation in the footer
-  // immediately.
-  if (batch.type_ == REPLICATE) {
-    // Update the index bounds for the current segment.
-    for (const LogEntryPB& entry_pb : batch.entry_batch_pb_->entry()) {
-      UpdateFooterForReplicateEntry(entry_pb, &footer_);
-    }
-  }
-}
-
-Status SegmentAllocator::Sync() {
-  TRACE_EVENT0("log", "Sync");
-  SCOPED_LATENCY_METRIC(ctx_->metrics, sync_latency);
-
-  if (PREDICT_FALSE(FLAGS_log_inject_latency && !sync_disabled_)) {
-    Random r(GetCurrentTimeMicros());
-    int sleep_ms = r.Normal(FLAGS_log_inject_latency_ms_mean,
-                            FLAGS_log_inject_latency_ms_stddev);
-    if (sleep_ms > 0) {
-      LOG_WITH_PREFIX(WARNING) << "Injecting " << sleep_ms
-                               << "ms of latency in SegmentAllocator::Sync()";
-      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
-    }
-  }
-
-  if (opts_->force_fsync_all && !false) {
-    LOG_SLOW_EXECUTION(WARNING, 50, Substitute("$0Fsync log took a long time", LogPrefix())) {
-      RETURN_NOT_OK(active_segment_->Sync());
-
-      if (hooks_) {
-        RETURN_NOT_OK_PREPEND(hooks_->PostSyncIfFsyncEnabled(),
-                              "PostSyncIfFsyncEnabled hook failed");
-      }
-    }
-  }
-
-  if (hooks_) {
-    RETURN_NOT_OK_PREPEND(hooks_->PostSync(), "PostSync hook failed");
-  }
-  return Status::OK();
-}
-
 Status Log::Sync() {
   return segment_allocator_.Sync();
 }
@@ -1072,118 +1183,6 @@ Status Log::RemoveRecoveryDirIfExists(FsManager* fs_manager, const string& table
   return Status::OK();
 }
 
-Status SegmentAllocator::AllocateNewSegment() {
-  TRACE_EVENT1("log", "AllocateNewSegment", "file", next_segment_path_);
-  CHECK_EQ(kAllocationInProgress, allocation_state());
-
-  // We must mark allocation as finished when returning from this method.
-  auto alloc_finished = MakeScopedCleanup([&] () {
-    std::lock_guard<RWMutex> l(allocation_lock_);
-    allocation_state_ = kAllocationFinished;
-  });
-
-  WritableFileOptions opts;
-  opts.sync_on_close = opts_->force_fsync_all;
-  string tmp_suffix = Substitute("$0$1", kTmpInfix, ".newsegmentXXXXXX");
-  string path_tmpl = JoinPathSegments(ctx_->log_dir, tmp_suffix);
-  VLOG_WITH_PREFIX(2) << "Creating temp. file for place holder segment, template: " << path_tmpl;
-  unique_ptr<WritableFile> segment_file;
-  Env* env = ctx_->fs_manager->env();
-  RETURN_NOT_OK_PREPEND(env->NewTempWritableFile(opts,
-                                         path_tmpl,
-                                         &next_segment_path_,
-                                         &segment_file), "ASDF");
-  next_segment_file_.reset(segment_file.release());
-  VLOG_WITH_PREFIX(1) << "Created next WAL segment, placeholder path: " << next_segment_path_;
-
-  MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_preallocate_fraction,
-      Status::IOError("Injected IOError in SegmentAllocator::AllocateNewSegment()"));
-
-  if (opts_->preallocate_segments) {
-    RETURN_NOT_OK(env_util::VerifySufficientDiskSpace(env,
-                                                      next_segment_path_,
-                                                      max_segment_size_,
-                                                      FLAGS_fs_wal_dir_reserved_bytes));
-    // TODO (perf) zero the new segments -- this could result in
-    // additional performance improvements.
-    RETURN_NOT_OK_PREPEND(next_segment_file_->PreAllocate(max_segment_size_), "E");
-  }
-  return Status::OK();
-}
-
-Status SegmentAllocator::SwitchToAllocatedSegment() {
-  // Increment "next" log segment seqno.
-  active_segment_sequence_number++;
-  const auto& tablet_id = ctx_->tablet_id;
-  string new_segment_path = ctx_->fs_manager->GetWalSegmentFileName(tablet_id,
-                                                                    active_segment_sequence_number);
-  Env* env = ctx_->fs_manager->env();
-  RETURN_NOT_OK_PREPEND(env->RenameFile(next_segment_path_, new_segment_path), "rename");
-  if (opts_->force_fsync_all) {
-    RETURN_NOT_OK(env->SyncDir(ctx_->log_dir));
-  }
-
-  // Create a new segment in memory.
-  unique_ptr<WritableLogSegment> new_segment(
-      new WritableLogSegment(new_segment_path, std::move(next_segment_file_)));
-
-  // Set up the new header and footer.
-  LogSegmentHeaderPB header;
-  header.set_sequence_number(active_segment_sequence_number);
-  header.set_tablet_id(tablet_id);
-  if (codec_) {
-    header.set_compression_codec(codec_->type());
-  }
-
-  // Set up the new footer. This will be maintained as the segment is written.
-  footer_.Clear();
-  footer_.set_num_entries(0);
-
-  // Set the new segment's schema.
-  {
-    shared_lock<rw_spinlock> l(schema_lock_);
-    RETURN_NOT_OK(SchemaToPB(schema_, header.mutable_schema()));
-    header.set_schema_version(schema_version_);
-  }
-  RETURN_NOT_OK_PREPEND(new_segment->WriteHeaderAndOpen(header), "Failed to write header");
-
-  // Open the segment we just created in readable form and add it to the reader.
-  // TODO(todd): consider using a global FileCache here? With short log segments and
-  // lots of tablets, this file descriptor usage may add up.
-  unique_ptr<RandomAccessFile> readable_file;
-
-  RandomAccessFileOptions opts;
-  RETURN_NOT_OK(env->NewRandomAccessFile(opts, new_segment_path, &readable_file));
-  scoped_refptr<ReadableLogSegment> readable_segment(
-    new ReadableLogSegment(new_segment_path,
-                           shared_ptr<RandomAccessFile>(readable_file.release())));
-  RETURN_NOT_OK(readable_segment->Init(header, new_segment->first_entry_offset()));
-  RETURN_NOT_OK(reader_add_segment_(readable_segment));
-
-  // Now set 'active_segment_' to the new segment.
-  active_segment_ = std::move(new_segment);
-
-  std::lock_guard<RWMutex> l(allocation_lock_);
-  allocation_state_ = kAllocationNotStarted;
-  return Status::OK();
-}
-
-Status SegmentAllocator::GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment) {
-  CHECK(active_segment_->IsClosed());
-  shared_ptr<RandomAccessFile> readable_file;
-  RETURN_NOT_OK(
-      OpenFileForRandom(ctx_->fs_manager->env(), active_segment_->path(), &readable_file));
-  scoped_refptr<ReadableLogSegment> segment(
-      new ReadableLogSegment(active_segment_->path(), readable_file));
-  // Note: segment->header() will only contain an initialized PB if we
-  // wrote the header out.
-  RETURN_NOT_OK(segment->Init(active_segment_->header(),
-                              active_segment_->footer(),
-                              active_segment_->first_entry_offset()));
-  *readable_segment = std::move(segment);
-  return Status::OK();
-}
-
 Status Log::AddEmptySegmentInReader(const scoped_refptr<ReadableLogSegment>& segment) {
   std::lock_guard<percpu_rwlock> l(state_lock_);
   return reader_->AppendEmptySegment(segment);
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 7fcfcdd..06ebc4a 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -158,6 +158,7 @@ class SegmentAllocator {
   // Returns a readable segment pointing at the most recently closed segment.
   Status GetClosedSegment(scoped_refptr<ReadableLogSegment>* readable_segment);
 
+  // Sets the schema and version to be used for the next allocated segment.
   void SetSchemaForNextSegment(Schema schema, uint32_t version);
 
   // Schedules a task to allocate a new log segment.


[kudu] 02/02: [mini-cluster] another fix on building mini-cluster JAR

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 93ba778a7e72cbc55a706f7d61d5ee1cd2721c56
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Nov 15 22:43:59 2019 -0800

    [mini-cluster] another fix on building mini-cluster JAR
    
    Check whether the build directory exists prior moving it.
    
    This is a follow-up to 6406123924c3900e8edd65eebbb3d93963fd2a7b.
    
    Change-Id: I26b531e3fcb250adeecbbaa9aa12858a4066ada2
    Reviewed-on: http://gerrit.cloudera.org:8080/14730
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 build-support/mini-cluster/build_mini_cluster_binaries.sh | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/build-support/mini-cluster/build_mini_cluster_binaries.sh b/build-support/mini-cluster/build_mini_cluster_binaries.sh
index 1d0a434..da8d630 100755
--- a/build-support/mini-cluster/build_mini_cluster_binaries.sh
+++ b/build-support/mini-cluster/build_mini_cluster_binaries.sh
@@ -74,12 +74,14 @@ TARGETS="kudu kudu-tserver kudu-master"
 
 # Remove leftovers from previous releases/builds: rename the existing directory.
 # To skip this step, set the MINI_CLUSTER_NO_FRESH_BUILD environment variable.
-if [ -n "$MINI_CLUSTER_NO_FRESH_BUILD" ]; then
-  echo "WARNING: using existing build directory"
-else
-  suffix=$(date "+%Y%m%d.%H%M%S")
-  echo "Moving existing $BUILD_ROOT into $BUILD_ROOT.$suffix"
-  mv $BUILD_ROOT $BUILD_ROOT.$suffix
+if [ -d "$BUILD_ROOT" ]; then
+  if [ -n "$MINI_CLUSTER_NO_FRESH_BUILD" ]; then
+    echo "WARNING: using existing build directory"
+  else
+    suffix=$(date "+%Y%m%d.%H%M%S")
+    echo "Moving existing $BUILD_ROOT into $BUILD_ROOT.$suffix"
+    mv $BUILD_ROOT $BUILD_ROOT.$suffix
+  fi
 fi
 
 cd $SOURCE_ROOT