You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/05/02 21:13:53 UTC

[1/3] kudu git commit: log block manager: detect and repair unpunched holes

Repository: kudu
Updated Branches:
  refs/heads/master b2e664a37 -> 867fc91cb


log block manager: detect and repair unpunched holes

This patch adds detection and repair of "unpunched holes" to the LBM. These
are deleted blocks whose space in the container data files is still "live",
either because hole punching failed, or because the server crashed before
the holes could be punched.

The newly added container accounting is used as a heuristic to decide when
to repunch holes: if a container's data size exceeds the size we think it
should have (after alignment), we'll repunch all holes and truncate any
preallocated space off the end. The heuristic uses some (configurable) slop
to work around various filesystem accounting issues.

An alternative is to use the container's extent map to figure this out (and
to provide greater precision on where to punch), but testing on el6.6 showed
that calling the FS_IOC_FIEMAP ioctl() on every full container increased LBM
startup time by about 50%. That's bad enough that we shouldn't do it willy
nilly. It could be gated on the above heuristic and used to drive more
precise hole repunching, but given the complexity involved and given the
amount of tilting I've done at this particular windmill, it'll remain a
problem for another day.

Testing is done in three ways:
- A new unit test that exercises new LBMCorruptor functionality.
- Inclusion in block_manager-stress-test via the change to
  LBMCorruptor::InjectRandomNonFatalInconsistency().
- Inclusion in BlockManagerTest.TestMetadataOkayDespiteFailedWrites via the
  generalization of the env_inject_io_error gflag.

Change-Id: I016ea401380f4c8c7b1fd907ff67cb595f377dd1
Reviewed-on: http://gerrit.cloudera.org:8080/6717
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6a118594
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6a118594
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6a118594

Branch: refs/heads/master
Commit: 6a118594cdb90350d1c0d4754f27b883a96aba56
Parents: b2e664a
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Apr 21 18:38:43 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 21:11:30 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/block_manager-stress-test.cc   |   4 +
 src/kudu/fs/block_manager-test.cc          |   8 +-
 src/kudu/fs/log_block_manager-test-util.cc | 136 +++++++++++++++++-------
 src/kudu/fs/log_block_manager-test-util.h  |  29 ++++-
 src/kudu/fs/log_block_manager-test.cc      |  78 ++++++++++++--
 src/kudu/fs/log_block_manager.cc           |  95 +++++++++++++----
 src/kudu/fs/log_block_manager.h            |  12 ++-
 src/kudu/util/env_posix.cc                 |  16 +--
 8 files changed, 297 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/block_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-stress-test.cc b/src/kudu/fs/block_manager-stress-test.cc
index dcbcb05..5b86f42 100644
--- a/src/kudu/fs/block_manager-stress-test.cc
+++ b/src/kudu/fs/block_manager-stress-test.cc
@@ -37,6 +37,7 @@
 #include "kudu/util/thread.h"
 
 DECLARE_int64(block_manager_max_open_files);
+DECLARE_double(log_container_excess_space_before_cleanup_fraction);
 DECLARE_uint64(log_container_max_size);
 DECLARE_uint64(log_container_preallocate_bytes);
 
@@ -102,6 +103,9 @@ class BlockManagerStressTest : public KuduTest {
     // Ensure the file cache is under stress too.
     FLAGS_block_manager_max_open_files = 512;
 
+    // Maximize the amount of cleanup triggered by the extra space heuristic.
+    FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
+
     if (FLAGS_block_manager_paths.empty()) {
       data_dirs_.push_back(test_dir_);
     } else {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 0c8df85..71f5471 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -46,15 +46,11 @@ using strings::Substitute;
 
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
-
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
-
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
-
 DECLARE_string(block_manager);
-
-DECLARE_double(env_inject_io_error_on_write_or_preallocate);
+DECLARE_double(env_inject_io_error);
 
 // Generic block manager metrics.
 METRIC_DECLARE_gauge_uint64(block_manager_blocks_open_reading);
@@ -659,7 +655,7 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
   FLAGS_log_container_preallocate_bytes = 8 * 1024;
 
   // Force some file operations to fail.
-  FLAGS_env_inject_io_error_on_write_or_preallocate = 0.2;
+  FLAGS_env_inject_io_error = 0.2;
 
   // Creates a block, writing the result to 'out' on success.
   auto create_a_block = [&](BlockId* out) -> Status {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/log_block_manager-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.cc b/src/kudu/fs/log_block_manager-test-util.cc
index 6359574..6a31ee3 100644
--- a/src/kudu/fs/log_block_manager-test-util.cc
+++ b/src/kudu/fs/log_block_manager-test-util.cc
@@ -122,10 +122,9 @@ Status LBMCorruptor::PreallocateFullContainer() {
   RWFileOptions opts;
   opts.mode = Env::OPEN_EXISTING;
   RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
-  uint64_t initial_size;
-  RETURN_NOT_OK(data_file->Size(&initial_size));
-  RETURN_NOT_OK(data_file->PreAllocate(initial_size, kPreallocateBytes, mode));
-
+  int64_t initial_size;
+  RETURN_NOT_OK(PreallocateForBlock(data_file.get(), mode,
+                                    kPreallocateBytes, &initial_size));
   if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
     // Some older versions of ext4 (such as on el6) will not truncate unwritten
     // preallocated space that extends beyond the file size. Let's help them
@@ -139,6 +138,39 @@ Status LBMCorruptor::PreallocateFullContainer() {
   return Status::OK();
 }
 
+Status LBMCorruptor::AddUnpunchedBlockToFullContainer() {
+  const Container* c;
+  RETURN_NOT_OK(GetRandomContainer(FULL, &c));
+
+  uint64_t fs_block_size;
+  RETURN_NOT_OK(env_->GetBlockSize(c->data_filename, &fs_block_size));
+
+  // "Write" out the block by growing the data file by some random amount.
+  //
+  // Must be non-zero length, otherwise preallocation will fail.
+  unique_ptr<RWFile> data_file;
+  RWFileOptions opts;
+  opts.mode = Env::OPEN_EXISTING;
+  RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
+  int64_t block_length = (rand_.Uniform(16) + 1) * fs_block_size;
+  int64_t initial_data_size;
+  RETURN_NOT_OK(PreallocateForBlock(data_file.get(), RWFile::CHANGE_FILE_SIZE,
+                                    block_length, &initial_data_size));
+  RETURN_NOT_OK(data_file->Close());
+
+  // Having written out the block, write both CREATE and DELETE metadata
+  // records for it.
+  unique_ptr<WritablePBContainerFile> metadata_writer;
+  RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
+  BlockId block_id(rand_.Next64());
+  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(), block_id,
+                                   initial_data_size, block_length));
+  RETURN_NOT_OK(AppendDeleteRecord(metadata_writer.get(), block_id));
+
+  LOG(INFO) << "Added unpunched block to full container " << c->name;
+  return metadata_writer->Close();
+}
+
 Status LBMCorruptor::CreateIncompleteContainer() {
   unique_ptr<RWFile> data_file;
   unique_ptr<RWFile> metadata_file;
@@ -188,14 +220,14 @@ Status LBMCorruptor::AddMalformedRecordToContainer() {
   // not going to fill that space, but this ensures that the block's record
   // isn't considered malformed only because it stretches past the end of the
   // data file.
-  uint64_t initial_data_size;
+  int64_t initial_data_size;
   {
     unique_ptr<RWFile> data_file;
     RWFileOptions opts;
     opts.mode = Env::OPEN_EXISTING;
     RETURN_NOT_OK(env_->NewRWFile(opts, c->data_filename, &data_file));
-    RETURN_NOT_OK(data_file->Size(&initial_data_size));
-    RETURN_NOT_OK(data_file->PreAllocate(initial_data_size, kBlockSize, RWFile::CHANGE_FILE_SIZE));
+    RETURN_NOT_OK(PreallocateForBlock(data_file.get(), RWFile::CHANGE_FILE_SIZE,
+                                      kBlockSize, &initial_data_size));
     RETURN_NOT_OK(data_file->Close());
   }
 
@@ -303,14 +335,8 @@ Status LBMCorruptor::AddMisalignedBlockToContainer() {
   // Having written out the block, write a corresponding metadata record.
   unique_ptr<WritablePBContainerFile> metadata_writer;
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(CREATE);
-  record.set_offset(block_offset);
-  record.set_length(block_length);
-  record.set_timestamp_us(0);
-
-  RETURN_NOT_OK(metadata_writer->Append(record));
+  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(), block_id,
+                                   block_offset, block_length));
 
   LOG(INFO) << "Added misaligned block to container " << c->name;
   return metadata_writer->Close();
@@ -324,14 +350,9 @@ Status LBMCorruptor::AddPartialRecordToContainer() {
   RETURN_NOT_OK(OpenMetadataWriter(*c, &metadata_writer));
 
   // Add a new good record to the container.
-  BlockId block_id(rand_.Next64());
-  BlockRecordPB record;
-  block_id.CopyToPB(record.mutable_block_id());
-  record.set_op_type(CREATE);
-  record.set_offset(0);
-  record.set_length(0);
-  record.set_timestamp_us(0);
-  RETURN_NOT_OK(metadata_writer->Append(record));
+  RETURN_NOT_OK(AppendCreateRecord(metadata_writer.get(),
+                                   BlockId(rand_.Next64()),
+                                   0, 0));
 
   // Corrupt the record by truncating one byte off the end of it.
   {
@@ -368,22 +389,29 @@ Status LBMCorruptor::AddPartialRecordToContainer() {
 
 Status LBMCorruptor::InjectRandomNonFatalInconsistency() {
   while (true) {
-    int r = rand_.Uniform(4);
-    if (r == 0) {
-      return AddMisalignedBlockToContainer();
-    }
-    if (r == 1) {
-      return CreateIncompleteContainer();
-    }
-    if (r == 2) {
-      if (!full_containers_.empty()) {
+    int r = rand_.Uniform(5);
+    switch (r) {
+      case 0:
+        return AddMisalignedBlockToContainer();
+      case 1:
+        return CreateIncompleteContainer();
+      case 2:
+        if (full_containers_.empty()) {
+          // Loop and try a different operation.
+          break;
+        }
         return PreallocateFullContainer();
-      }
-      // Loop and try a different operation.
-      continue;
+      case 3:
+        if (full_containers_.empty()) {
+          // Loop and try a different operation.
+          break;
+        }
+        return AddUnpunchedBlockToFullContainer();
+      case 4:
+        return AddPartialRecordToContainer();
+      default:
+        LOG(FATAL) << "Unexpected value " << r;
     }
-    CHECK_EQ(r, 3);
-    return AddPartialRecordToContainer();
   }
 }
 
@@ -404,6 +432,40 @@ Status LBMCorruptor::OpenMetadataWriter(
   return Status::OK();
 }
 
+Status LBMCorruptor::AppendCreateRecord(WritablePBContainerFile* writer,
+                                        BlockId block_id,
+                                        int64_t block_offset,
+                                        int64_t block_length) {
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(CREATE);
+  record.set_offset(block_offset);
+  record.set_length(block_length);
+  record.set_timestamp_us(0); // has no effect
+  return writer->Append(record);
+}
+
+Status LBMCorruptor::AppendDeleteRecord(WritablePBContainerFile* writer,
+                                        BlockId block_id) {
+  BlockRecordPB record;
+  block_id.CopyToPB(record.mutable_block_id());
+  record.set_op_type(DELETE);
+  record.set_timestamp_us(0); // has no effect
+  return writer->Append(record);
+}
+
+Status LBMCorruptor::PreallocateForBlock(RWFile* data_file,
+                                         RWFile::PreAllocateMode mode,
+                                         int64_t block_length,
+                                         int64_t* old_data_file_size) {
+  uint64_t initial_size;
+  RETURN_NOT_OK(data_file->Size(&initial_size));
+  RETURN_NOT_OK(data_file->PreAllocate(initial_size, block_length, mode));
+
+  *old_data_file_size = initial_size;
+  return Status::OK();
+}
+
 Status LBMCorruptor::GetRandomContainer(FindContainerMode mode,
                                         const Container** container) const {
   if (mode == FULL) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/log_block_manager-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test-util.h b/src/kudu/fs/log_block_manager-test-util.h
index a4076aa..fa260eb 100644
--- a/src/kudu/fs/log_block_manager-test-util.h
+++ b/src/kudu/fs/log_block_manager-test-util.h
@@ -20,15 +20,15 @@
 #include <string>
 #include <vector>
 
+#include "kudu/fs/block_id.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/util/env.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/random.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class Env;
-
 namespace pb_util {
 class WritablePBContainerFile;
 } // namespace pb_util
@@ -52,6 +52,13 @@ class LBMCorruptor {
   // Returns an error if a full container could not be found.
   Status PreallocateFullContainer();
 
+  // Adds an "unpunched block" to a full container (chosen at random). An
+  // unpunched block is one that has been deleted but whose space was not
+  // reclaimed.
+  //
+  // Returns an error if a container could not be found.
+  Status AddUnpunchedBlockToFullContainer();
+
   // Creates a new incomplete container. This inconsistency is non-fatal and
   // repairable.
   Status CreateIncompleteContainer();
@@ -94,6 +101,24 @@ class LBMCorruptor {
       const Container& container,
       std::unique_ptr<pb_util::WritablePBContainerFile>* writer);
 
+  // Appends a CREATE record to 'writer'.
+  static Status AppendCreateRecord(pb_util::WritablePBContainerFile* writer,
+                                   BlockId block_id,
+                                   int64_t block_offset,
+                                   int64_t block_length);
+
+  // Appends a DELETE record to 'writer'.
+  static Status AppendDeleteRecord(pb_util::WritablePBContainerFile* writer,
+                                   BlockId block_id);
+
+  // Preallocates space at the end of a container's data file for a new block.
+  //
+  // On success, writes the initial data file's size to 'old_data_file_size'.
+  static Status PreallocateForBlock(RWFile* data_file,
+                                    RWFile::PreAllocateMode mode,
+                                    int64_t block_length,
+                                    int64_t* old_data_file_size);
+
   // Gets a random container subject to the restriction in 'mode'.
   //
   // Returns an error if no such container could be found.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
index e906e97..1a2d828 100644
--- a/src/kudu/fs/log_block_manager-test.cc
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -44,11 +44,11 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_double(log_container_excess_space_before_cleanup_fraction);
+DECLARE_int64(log_container_max_blocks);
 DECLARE_uint64(log_container_preallocate_bytes);
 DECLARE_uint64(log_container_max_size);
 
-DECLARE_int64(log_container_max_blocks);
-
 // Log block manager metrics.
 METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
@@ -856,7 +856,14 @@ TEST_F(LogBlockManagerTest, TestMisalignedBlocksFuzz) {
 }
 
 TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
+  // Enforce that the container's actual size is strictly upper-bounded by the
+  // calculated size so we can more easily trigger repairs.
+  FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
+
+  // Disable preallocation so we can more easily control it.
   FLAGS_log_container_preallocate_bytes = 0;
+
+  // Make it easy to create a full container.
   FLAGS_log_container_max_size = 1;
 
   const int kNumContainers = 10;
@@ -883,21 +890,78 @@ TEST_F(LogBlockManagerTest, TestRepairPreallocateExcessSpace) {
   FsReport report;
   ASSERT_OK(ReopenBlockManager(&report));
   ASSERT_FALSE(report.HasFatalErrors());
-  // XXX: the LBM currently declares all full containers as having excess
-  // preallocated space. Using extent maps will clear that up.
-  ASSERT_EQ(kNumContainers, report.full_container_space_check->entries.size());
+  ASSERT_EQ(1, report.full_container_space_check->entries.size());
   const LBMFullContainerSpaceCheck::Entry& fcs =
       report.full_container_space_check->entries[0];
   unordered_set<string> container_name_set(container_names.begin(),
                                            container_names.end());
   ASSERT_TRUE(ContainsKey(container_name_set, fcs.container));
-  // XXX: See above, excess can be zero.
-  ASSERT_GE(fcs.excess_bytes, 0);
+  ASSERT_GT(fcs.excess_bytes, 0);
   ASSERT_TRUE(fcs.repaired);
   report.full_container_space_check->entries.clear();
   NO_FATALS(AssertEmptyReport(report));
 }
 
+TEST_F(LogBlockManagerTest, TestRepairUnpunchedBlocks) {
+  const int kNumBlocks = 100;
+
+  // Enforce that the container's actual size is strictly upper-bounded by the
+  // calculated size so we can more easily trigger repairs.
+  FLAGS_log_container_excess_space_before_cleanup_fraction = 0.0;
+
+  // Force our single container to become full once created.
+  FLAGS_log_container_max_size = 0;
+
+  // Force the test to measure extra space in unpunched holes, not in the
+  // preallocation buffer.
+  FLAGS_log_container_preallocate_bytes = 0;
+
+  // Create one container.
+  unique_ptr<WritableBlock> block;
+  ASSERT_OK(bm_->CreateBlock(&block));
+  ASSERT_OK(block->Close());
+  string data_file;
+  NO_FATALS(GetOnlyContainerDataFile(&data_file));
+  uint64_t file_size_on_disk;
+  ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
+  ASSERT_EQ(0, file_size_on_disk);
+
+  // Add some "unpunched blocks" to the container.
+  LBMCorruptor corruptor(env_, { test_dir_ }, SeedRandom());
+  ASSERT_OK(corruptor.Init());
+  for (int i = 0; i < kNumBlocks; i++) {
+    ASSERT_OK(corruptor.AddUnpunchedBlockToFullContainer());
+  }
+
+  ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
+  ASSERT_GT(file_size_on_disk, 0);
+
+  // Check the report.
+  FsReport report;
+  ASSERT_OK(ReopenBlockManager(&report));
+  ASSERT_FALSE(report.HasFatalErrors());
+  ASSERT_EQ(1, report.full_container_space_check->entries.size());
+  const LBMFullContainerSpaceCheck::Entry& fcs =
+      report.full_container_space_check->entries[0];
+  string container;
+  NO_FATALS(GetOnlyContainer(&container));
+  ASSERT_EQ(container, fcs.container);
+  ASSERT_EQ(file_size_on_disk, fcs.excess_bytes);
+  ASSERT_TRUE(fcs.repaired);
+  report.full_container_space_check->entries.clear();
+  NO_FATALS(AssertEmptyReport(report));
+
+  // Wait for the block manager to punch out all of the holes (done as part of
+  // repair at startup). It's easiest to do this by reopening it; shutdown will
+  // wait for outstanding hole punches.
+  ASSERT_OK(ReopenBlockManager(&report));
+  NO_FATALS(AssertEmptyReport(report));
+
+  // File size should be 0 post-repair.
+  ASSERT_OK(env_->GetFileSizeOnDisk(data_file, &file_size_on_disk));
+  ASSERT_EQ(0, file_size_on_disk);
+}
+
 TEST_F(LogBlockManagerTest, TestRepairIncompleteContainer) {
   const int kNumContainers = 20;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 9ccf94c..c866fc0 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -76,6 +76,13 @@ DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 * 1024,
               "creating new blocks. Set to 0 to disable preallocation");
 TAG_FLAG(log_container_preallocate_bytes, advanced);
 
+DEFINE_double(log_container_excess_space_before_cleanup_fraction, 0.10,
+              "Additional fraction of a log container's calculated size that "
+              "must be consumed on disk before the container is considered to "
+              "be inconsistent and subject to excess space cleanup at block "
+              "manager startup.");
+TAG_FLAG(log_container_excess_space_before_cleanup_fraction, advanced);
+
 DEFINE_bool(log_block_manager_test_hole_punching, true,
             "Ensure hole punching is supported by the underlying filesystem");
 TAG_FLAG(log_block_manager_test_hole_punching, advanced);
@@ -1719,6 +1726,9 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   local_report.misaligned_block_check.emplace();
   local_report.partial_record_check.emplace();
 
+  // Keep track of deleted blocks that we may need to repunch.
+  vector<scoped_refptr<internal::LogBlock>> need_repunching;
+
   // Find all containers and open them.
   unordered_set<string> containers_seen;
   vector<string> children;
@@ -1776,9 +1786,11 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // exceedingly unlikely. However, we might have old data which still exhibits
     // the above issue.
     UntrackedBlockMap blocks_in_container;
+    vector<scoped_refptr<internal::LogBlock>> deleted;
     uint64_t max_block_id = 0;
     for (const BlockRecordPB& r : records) {
-      ProcessBlockRecord(r, &local_report, container.get(), &blocks_in_container);
+      ProcessBlockRecord(r, &local_report, container.get(),
+                         &blocks_in_container, &deleted);
       max_block_id = std::max(max_block_id, r.block_id().id());
     }
 
@@ -1798,20 +1810,47 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     }
 
     // Having processed the block records, let's check whether any full
-    // containers have any extra preallocated space (left behind after a crash
-    // or from an older version of Kudu).
+    // containers have any extra space (left behind after a crash or from an
+    // older version of Kudu).
     if (container->full()) {
-      // XFS's speculative preallocation feature may artificially enlarge the
-      // container's data file without updating its file size. As such, we
-      // cannot simply compare the container's logical file size (available by
-      // proxy via preallocated_offset_) to next_block_offset_ to decide
-      // whether to truncate the container.
+      // Filesystems are unpredictable beasts and may misreport the amount of
+      // space allocated to a file in various interesting ways. Some examples:
+      // - XFS's speculative preallocation feature may artificially enlarge the
+      //   container's data file without updating its file size. This makes the
+      //   file size untrustworthy for the purposes of measuring allocated space.
+      //   See KUDU-1856 for more details.
+      // - On el6.6/ext4 a container data file that consumed ~32K according to
+      //   its extent tree was actually reported as consuming an additional fs
+      //   block (2k) of disk space. A similar container data file (generated
+      //   via the same workload) on Ubuntu 16.04/ext4 did not exhibit this.
+      //   The suspicion is that older versions of ext4 include interior nodes
+      //   of the extent tree when reporting file block usage.
       //
-      // See KUDU-1856 for more details.
+      // To deal with these issues, our extra space cleanup code (deleted block
+      // repunching and container truncation) is gated on an "actual disk space
+      // consumed" heuristic. To prevent unnecessary triggering of the
+      // heuristic, we allow for some slop in our size measurements. The exact
+      // amount of slop is configurable via
+      // log_container_excess_space_before_cleanup_fraction.
       //
-      // TODO(adar): figure out whether to truncate using container's extent map.
-      local_report.full_container_space_check->entries.emplace_back(
-          container->ToString(), container->preallocated_window());
+      // Too little slop and we'll do unnecessary work at startup. Too much and
+      // more unused space may go unreclaimed.
+      string data_filename = StrCat(container->ToString(), kContainerDataFileSuffix);
+      uint64_t reported_size;
+      s = env_->GetFileSizeOnDisk(data_filename, &reported_size);
+      if (!s.ok()) {
+        *result_status = s.CloneAndPrepend(Substitute(
+            "Could not get on-disk file size of container $0", container->ToString()));
+        return;
+      }
+      int64_t cleanup_threshold_size = container->live_bytes_aligned() *
+          (1 + FLAGS_log_container_excess_space_before_cleanup_fraction);
+      if (reported_size > cleanup_threshold_size) {
+        local_report.full_container_space_check->entries.emplace_back(
+            container->ToString(), reported_size - container->live_bytes_aligned());
+        need_repunching.insert(need_repunching.end(),
+                               deleted.begin(), deleted.end());
+      }
 
       local_report.stats.lbm_full_container_count++;
     }
@@ -1848,7 +1887,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
 
   // Like the rest of Open(), repairs are performed per data directory to take
   // advantage of parallelism.
-  s = Repair(&local_report);
+  s = Repair(&local_report, std::move(need_repunching));
   if (!s.ok()) {
     *result_status = s.CloneAndPrepend(Substitute(
         "fatal error while repairing inconsistencies in data directory $0",
@@ -1860,10 +1899,12 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   *result_status = Status::OK();
 }
 
-void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
-                                         FsReport* report,
-                                         LogBlockContainer* container,
-                                         UntrackedBlockMap* block_map) {
+void LogBlockManager::ProcessBlockRecord(
+    const BlockRecordPB& record,
+    FsReport* report,
+    LogBlockContainer* container,
+    UntrackedBlockMap* block_map,
+    vector<scoped_refptr<internal::LogBlock>>* deleted) {
   BlockId block_id(BlockId::FromPB(record.block_id()));
   scoped_refptr<LogBlock> lb;
   switch (record.op_type()) {
@@ -1900,6 +1941,7 @@ void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
       }
       VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
       container->BlockDeleted(lb->offset(), lb->length());
+      deleted->emplace_back(std::move(lb));
       break;
     default:
       // TODO(adar): treat as a different kind of inconsistency?
@@ -1909,7 +1951,9 @@ void LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
   }
 }
 
-Status LogBlockManager::Repair(FsReport* report) {
+Status LogBlockManager::Repair(
+    FsReport* report,
+    vector<scoped_refptr<internal::LogBlock>> need_repunching) {
   if (read_only_) {
     LOG(INFO) << "Read-only block manager, skipping repair";
     return Status::OK();
@@ -1919,6 +1963,8 @@ Status LogBlockManager::Repair(FsReport* report) {
     return Status::OK();
   }
 
+  // From here on out we're committed to repairing.
+
   // Fetch all the containers we're going to need.
   unordered_map<std::string, internal::LogBlockContainer*> containers_by_name;
   {
@@ -2008,6 +2054,19 @@ Status LogBlockManager::Repair(FsReport* report) {
     }
   }
 
+  // Repunch all requested holes. Any excess space reclaimed was already
+  // tracked by LBMFullContainerSpaceCheck.
+  //
+  // TODO(adar): can be more efficient (less fs work and more space reclamation
+  // in case of misaligned blocks) via hole coalescing first, but this is easy.
+  for (const auto& b : need_repunching) {
+    b->Delete();
+  }
+
+  // Clearing this vector drops the last references to the LogBlocks within,
+  // triggering the repunching operations.
+  need_repunching.clear();
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index b8ea377..d69aeaa 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -256,18 +256,22 @@ class LogBlockManager : public BlockManager {
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
   // Parses a block record, adding or removing it in 'block_map', and
-  // accounting for it in the metadata for 'container'.
+  // accounting for it in the metadata for 'container'. Deleted blocks are
+  // written to 'deleted'.
   //
   // If any record is malformed, it is written to 'report'.
   void ProcessBlockRecord(const BlockRecordPB& record,
                           FsReport* report,
                           internal::LogBlockContainer* container,
-                          UntrackedBlockMap* block_map);
+                          UntrackedBlockMap* block_map,
+                          std::vector<scoped_refptr<internal::LogBlock>>* deleted);
 
-  // Repairs any inconsistencies described in 'report'.
+  // Repairs any inconsistencies described in 'report'. Any blocks in
+  // 'need_repunching' will be punched out again.
   //
   // Returns an error if repairing a fatal inconsistency failed.
-  Status Repair(FsReport* report);
+  Status Repair(FsReport* report,
+                std::vector<scoped_refptr<internal::LogBlock>> need_repunching);
 
   // Opens a particular data directory belonging to the block manager. The
   // results of consistency checking (and repair, if applicable) are written to

http://git-wip-us.apache.org/repos/asf/kudu/blob/6a118594/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 11a0054..b9ed40f 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -115,9 +115,9 @@ DEFINE_bool(never_fsync, false,
 TAG_FLAG(never_fsync, advanced);
 TAG_FLAG(never_fsync, unsafe);
 
-DEFINE_double(env_inject_io_error_on_write_or_preallocate, 0.0,
-              "Fraction of the time that write or preallocate operations will fail");
-TAG_FLAG(env_inject_io_error_on_write_or_preallocate, hidden);
+DEFINE_double(env_inject_io_error, 0.0,
+              "Fraction of the time that certain I/O operations will fail");
+TAG_FLAG(env_inject_io_error, hidden);
 
 DEFINE_int32(env_inject_short_read_bytes, 0,
              "The number of bytes less than the requested bytes to read");
@@ -403,7 +403,7 @@ class PosixWritableFile : public WritableFile {
   }
 
   virtual Status PreAllocate(uint64_t size) OVERRIDE {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
 
     TRACE_EVENT1("io", "PosixWritableFile::PreAllocate", "path", filename_);
@@ -500,7 +500,7 @@ class PosixWritableFile : public WritableFile {
 
   Status DoWritev(const vector<Slice>& data_vector,
                   size_t offset, size_t n) {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
 
     ThreadRestrictions::AssertIOAllowed();
@@ -587,7 +587,7 @@ class PosixRWFile : public RWFile {
   }
 
   virtual Status Write(uint64_t offset, const Slice& data) OVERRIDE {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
 
     ThreadRestrictions::AssertIOAllowed();
@@ -613,7 +613,7 @@ class PosixRWFile : public RWFile {
   virtual Status PreAllocate(uint64_t offset,
                              size_t length,
                              PreAllocateMode mode) OVERRIDE {
-    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error_on_write_or_preallocate,
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
                          Status::IOError(Env::kInjectedFailureStatusMsg));
 
     TRACE_EVENT1("io", "PosixRWFile::PreAllocate", "path", filename_);
@@ -650,6 +650,8 @@ class PosixRWFile : public RWFile {
 
   virtual Status PunchHole(uint64_t offset, size_t length) OVERRIDE {
 #if defined(__linux__)
+    MAYBE_RETURN_FAILURE(FLAGS_env_inject_io_error,
+                         Status::IOError(Env::kInjectedFailureStatusMsg));
     TRACE_EVENT1("io", "PosixRWFile::PunchHole", "path", filename_);
     ThreadRestrictions::AssertIOAllowed();
     if (fallocate(fd_, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, offset, length) < 0) {


[2/3] kudu git commit: log_block_manager: refactor container record processing

Posted by ad...@apache.org.
log_block_manager: refactor container record processing

While working on some other LBM stuff I noticed that
LogBlockManager::ProcessBlockRecord() was effectively static; this patch
moves it into the LogBlockContainer. Besides narrowing the function's scope,
we can now process records as they are read rather than storing them in a
deque to be processed later.

To make this refactor work, I had to move the LogBlock definition so that
LogBlockContainer could use it. This also means various LogBlockContainer
methods can now take LogBlock-typed arguments, and GetAlignedBlockLength()
can move over to LogBlock where it belongs.

Change-Id: I5b8da74dc699f4afbeda41ee154eb1ba89dac292
Reviewed-on: http://gerrit.cloudera.org:8080/6769
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72e58eca
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72e58eca
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72e58eca

Branch: refs/heads/master
Commit: 72e58eca93c36ed2dcce23032503b3d964a34236
Parents: 6a11859
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Apr 30 20:51:13 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 21:11:41 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/log_block_manager.cc | 453 +++++++++++++++++-----------------
 src/kudu/fs/log_block_manager.h  |  26 +-
 2 files changed, 238 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index c866fc0..7e18c33 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -167,6 +167,59 @@ LogBlockManagerMetrics::LogBlockManagerMetrics(const scoped_refptr<MetricEntity>
 #undef MINIT
 
 ////////////////////////////////////////////////////////////
+// LogBlock (declaration)
+////////////////////////////////////////////////////////////
+
+// The persistent metadata that describes a logical block.
+//
+// A block grows a LogBlock when its data has been synchronized with
+// the disk. That's when it's fully immutable (i.e. none of its metadata
+// can change), and when it becomes readable and persistent.
+//
+// LogBlocks are reference counted to simplify support for deletion with
+// outstanding readers. All refcount increments are performed with the
+// block manager lock held, as are deletion-based decrements. However,
+// no lock is held when ~LogReadableBlock decrements the refcount, thus it
+// must be made thread safe (by extending RefCountedThreadSafe instead of
+// the simpler RefCounted).
+class LogBlock : public RefCountedThreadSafe<LogBlock> {
+ public:
+  LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
+           int64_t length);
+  ~LogBlock();
+
+  const BlockId& block_id() const { return block_id_; }
+  LogBlockContainer* container() const { return container_; }
+  int64_t offset() const { return offset_; }
+  int64_t length() const { return length_; }
+
+  // Returns a block's length aligned to the nearest filesystem block size.
+  int64_t fs_aligned_length() const;
+
+  // Delete the block. Actual deletion takes place when the
+  // block is destructed.
+  void Delete();
+
+ private:
+  // The owning container. Must outlive the LogBlock.
+  LogBlockContainer* container_;
+
+  // The block identifier.
+  const BlockId block_id_;
+
+  // The block's offset in the container.
+  const int64_t offset_;
+
+  // The block's length.
+  const int64_t length_;
+
+  // Whether the block has been marked for deletion.
+  bool deleted_;
+
+  DISALLOW_COPY_AND_ASSIGN(LogBlock);
+};
+
+////////////////////////////////////////////////////////////
 // LogBlockContainer
 ////////////////////////////////////////////////////////////
 
@@ -281,18 +334,28 @@ class LogBlockContainer {
   // This function is thread unsafe.
   Status TruncateDataToNextBlockOffset();
 
-  // Reads the container's metadata from disk, sanity checking and
-  // returning the records in 'records'. Any on-disk inconsistencies are
-  // recorded in 'report'.
-  Status ReadContainerRecords(FsReport* report,
-                              deque<BlockRecordPB>* records) const;
+  // Reads the container's metadata from disk, sanity checking and processing
+  // records along the way.
+  //
+  // Malformed records and other container inconsistencies are written to
+  // 'report'. Healthy blocks are written either to 'live_blocks' or
+  // 'dead_blocks'. The greatest block ID seen thus far in the container is
+  // written to 'max_block_id'.
+  //
+  // Returns an error only if there was a problem accessing the container from
+  // disk; such errors are fatal and effectively halt processing immediately.
+  Status ProcessRecords(
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      uint64_t* max_block_id);
 
   // Updates internal bookkeeping state to reflect the creation of a block,
   // marking this container as full if needed. Should only be called when a
   // block is fully written, as it will round up the container data file's position.
   //
   // This function is thread unsafe.
-  void BlockCreated(int64_t block_offset, int64_t block_length);
+  void BlockCreated(const scoped_refptr<LogBlock>& block);
 
   // Updates internal bookkeeping state to reflect the deletion of a block.
   //
@@ -300,7 +363,7 @@ class LogBlockContainer {
   // deletions can happen concurrently with creations.
   //
   // Note: the container is not made "unfull"; containers remain sparse until deleted.
-  void BlockDeleted(int64_t block_offset, int64_t block_length);
+  void BlockDeleted(const scoped_refptr<LogBlock>& block);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -311,9 +374,6 @@ class LogBlockContainer {
   // Produces a debug-friendly string representation of this container.
   string ToString() const;
 
-  // Returns a block's length aligned to the nearest filesystem block size;
-  int64_t GetAlignedBlockLength(int64_t block_offset, int64_t block_length) const;
-
   // Simple accessors.
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t next_block_offset() const { return next_block_offset_; }
@@ -339,19 +399,23 @@ class LogBlockContainer {
                     unique_ptr<WritablePBContainerFile> metadata_file,
                     shared_ptr<RWFile> data_file);
 
-  // Performs sanity checks on a block record.
+  // Processes a single block record, performing sanity checks on it and adding
+  // it either to 'live_blocks' or 'dead_blocks'.
   //
   // Returns an error only if there was a problem accessing the container from
   // disk; such errors are fatal and effectively halt processing immediately.
   //
   // On success, 'report' is updated with any inconsistencies found in the
   // record, 'data_file_size' may be updated with the latest size of the
-  // container's data file, and 'is_malformed_record' reflects whether or not
-  // the record was malformed.
-  Status CheckBlockRecord(const BlockRecordPB& record,
-                          FsReport* report,
-                          uint64_t* data_file_size,
-                          bool* is_malformed_record) const;
+  // container's data file, and 'max_block_id' reflects the largest block ID
+  // seen thus far in the container.
+  Status ProcessRecord(
+      const BlockRecordPB& record,
+      FsReport* report,
+      LogBlockManager::UntrackedBlockMap* live_blocks,
+      std::vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+      uint64_t* data_file_size,
+      uint64_t* max_block_id);
 
   // The owning block manager. Must outlive the container itself.
   LogBlockManager* const block_manager_;
@@ -576,8 +640,11 @@ Status LogBlockContainer::TruncateDataToNextBlockOffset() {
   return Status::OK();
 }
 
-Status LogBlockContainer::ReadContainerRecords(FsReport* report,
-                                               deque<BlockRecordPB>* records) const {
+Status LogBlockContainer::ProcessRecords(
+    FsReport* report,
+    LogBlockManager::UntrackedBlockMap* live_blocks,
+    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    uint64_t* max_block_id) {
   string metadata_path = metadata_file_->filename();
   unique_ptr<RandomAccessFile> metadata_reader;
   RETURN_NOT_OK(block_manager()->env()->NewRandomAccessFile(metadata_path, &metadata_reader));
@@ -585,7 +652,6 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
   RETURN_NOT_OK(pb_reader.Open());
 
   uint64_t data_file_size = 0;
-  deque<BlockRecordPB> local_records;
   Status read_status;
   while (true) {
     BlockRecordPB record;
@@ -593,19 +659,14 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
     if (!read_status.ok()) {
       break;
     }
-
-    bool is_malformed_record;
-    RETURN_NOT_OK(CheckBlockRecord(record, report,
-                                   &data_file_size, &is_malformed_record));
-    if (PREDICT_FALSE(!is_malformed_record)) {
-      local_records.emplace_back(std::move(record));
-    }
+    RETURN_NOT_OK(ProcessRecord(record, report,
+                                live_blocks, dead_blocks,
+                                &data_file_size, max_block_id));
   }
 
   // NOTE: 'read_status' will never be OK here.
   if (PREDICT_TRUE(read_status.IsEndOfFile())) {
     // We've reached the end of the file without any problems.
-    records->swap(local_records);
     return Status::OK();
   }
   if (read_status.IsIncomplete()) {
@@ -614,49 +675,95 @@ Status LogBlockContainer::ReadContainerRecords(FsReport* report,
     // write and truncate the metadata file to remove this partial record.
     report->partial_record_check->entries.emplace_back(ToString(),
                                                        pb_reader.offset());
-    records->swap(local_records);
     return Status::OK();
   }
   // If we've made it here, we've found (and are returning) an unrecoverable error.
   return read_status;
 }
 
-Status LogBlockContainer::CheckBlockRecord(const BlockRecordPB& record,
-                                           FsReport* report,
-                                           uint64_t* data_file_size,
-                                           bool* is_malformed_record) const {
-  if (record.op_type() == CREATE) {
-    // First verify that the record's offset/length aren't wildly incorrect.
-    if (PREDICT_FALSE(!record.has_offset() ||
-                      !record.has_length() ||
-                      record.offset() < 0  ||
-                      record.length() < 0)) {
-      report->malformed_record_check->entries.emplace_back(ToString(), record);
-      *is_malformed_record = true;
-      return Status::OK();
-    }
+Status LogBlockContainer::ProcessRecord(
+    const BlockRecordPB& record,
+    FsReport* report,
+    LogBlockManager::UntrackedBlockMap* live_blocks,
+    vector<scoped_refptr<internal::LogBlock>>* dead_blocks,
+    uint64_t* data_file_size,
+    uint64_t* max_block_id) {
+  BlockId block_id(BlockId::FromPB(record.block_id()));
+  scoped_refptr<LogBlock> lb;
+  switch (record.op_type()) {
+    case CREATE:
+      // First verify that the record's offset/length aren't wildly incorrect.
+      if (PREDICT_FALSE(!record.has_offset() ||
+                        !record.has_length() ||
+                        record.offset() < 0  ||
+                        record.length() < 0)) {
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
 
-    // Now it should be safe to access the record's offset/length.
-    //
-    // KUDU-1657: When opening a container in read-only mode which is actively
-    // being written to by another lbm, we must reinspect the data file's size
-    // frequently in order to account for the latest appends. Inspecting the
-    // file size is expensive, so we only do it when the metadata indicates
-    // that additional data has been written to the file.
-    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
-      RETURN_NOT_OK(data_file_->Size(data_file_size));
-    }
+      // Now it should be safe to access the record's offset/length.
+      //
+      // KUDU-1657: When opening a container in read-only mode which is actively
+      // being written to by another lbm, we must reinspect the data file's size
+      // frequently in order to account for the latest appends. Inspecting the
+      // file size is expensive, so we only do it when the metadata indicates
+      // that additional data has been written to the file.
+      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+        RETURN_NOT_OK(data_file_->Size(data_file_size));
+      }
+
+      // If the record still extends beyond the end of the file, it is malformed.
+      if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
+
+      lb = new LogBlock(this, block_id, record.offset(), record.length());
+      if (!InsertIfNotPresent(live_blocks, block_id, lb)) {
+        // We found a record whose ID matches that of an already created block.
+        //
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(
+            ToString(), record);
+        break;
+      }
+
+      VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
+                            block_id.ToString(),
+                            record.offset(), record.length());
+
+      // This block must be included in the container's logical size, even if
+      // it has since been deleted. This helps satisfy one of our invariants:
+      // once a container byte range has been used, it may never be reused in
+      // the future.
+      //
+      // If we ignored deleted blocks, we would end up reusing the space
+      // belonging to the last deleted block in the container.
+      BlockCreated(lb);
 
-    // If the record still extends beyond the end of the file, it is malformed.
-    if (PREDICT_FALSE(record.offset() + record.length() > *data_file_size)) {
+      *max_block_id = std::max(*max_block_id, block_id.id());
+      break;
+    case DELETE:
+      lb = EraseKeyReturnValuePtr(live_blocks, block_id);
+      if (!lb) {
+        // We found a record for which there is no already created block.
+        //
+        // TODO(adar): treat as a different kind of inconsistency?
+        report->malformed_record_check->entries.emplace_back(ToString(), record);
+        break;
+      }
+      VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
+      BlockDeleted(lb);
+      dead_blocks->emplace_back(std::move(lb));
+      break;
+    default:
+      // We found a record with an unknown type.
+      //
       // TODO(adar): treat as a different kind of inconsistency?
       report->malformed_record_check->entries.emplace_back(ToString(), record);
-      *is_malformed_record = true;
-      return Status::OK();
-    }
+      break;
   }
-
-  *is_malformed_record = false;
   return Status::OK();
 }
 
@@ -678,10 +785,10 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
   // will have written some garbage that can be expunged during a GC.
   RETURN_NOT_OK(block_manager()->SyncContainer(*this));
 
-  CHECK(block_manager()->AddLogBlock(this, block->id(),
-                                     next_block_offset_,
-                                     block->BytesAppended()));
-  BlockCreated(next_block_offset_, block->BytesAppended());
+  scoped_refptr<LogBlock> lb = block_manager()->AddLogBlock(
+      this, block->id(), next_block_offset_, block->BytesAppended());
+  CHECK(lb);
+  BlockCreated(lb);
 
   // Truncate the container if it's now full; any left over preallocated space
   // is no longer needed.
@@ -790,8 +897,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
   return Status::OK();
 }
 
-void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length) {
-  DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockCreated(const scoped_refptr<LogBlock>& block) {
+  DCHECK_GE(block->offset(), 0);
 
   // The log block manager maintains block contiguity as an invariant, which
   // means accounting for the new block should be as simple as adding its
@@ -804,7 +911,8 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   // boundary. This guarantees that the disk space can be reclaimed when
   // the block is deleted.
   int64_t new_next_block_offset = KUDU_ALIGN_UP(
-      block_offset + block_length, instance()->filesystem_block_size_bytes());
+      block->offset() + block->length(),
+      instance()->filesystem_block_size_bytes());
   if (PREDICT_FALSE(new_next_block_offset < next_block_offset_)) {
     LOG(WARNING) << Substitute(
         "Container $0 unexpectedly tried to lower its next block offset "
@@ -813,7 +921,7 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   } else {
     int64_t aligned_block_length = new_next_block_offset - next_block_offset_;
     total_bytes_+= aligned_block_length;
-    live_bytes_.IncrementBy(block_length);
+    live_bytes_.IncrementBy(block->length());
     live_bytes_aligned_.IncrementBy(aligned_block_length);
     next_block_offset_ = new_next_block_offset;
   }
@@ -827,11 +935,11 @@ void LogBlockContainer::BlockCreated(int64_t block_offset, int64_t block_length)
   }
 }
 
-void LogBlockContainer::BlockDeleted(int64_t block_offset, int64_t block_length) {
-  DCHECK_GE(block_offset, 0);
+void LogBlockContainer::BlockDeleted(const scoped_refptr<LogBlock>& block) {
+  DCHECK_GE(block->offset(), 0);
 
-  live_bytes_.IncrementBy(-block_length);
-  live_bytes_aligned_.IncrementBy(-GetAlignedBlockLength(block_offset, block_length));
+  live_bytes_.IncrementBy(-block->length());
+  live_bytes_aligned_.IncrementBy(-block->fs_aligned_length());
   live_blocks_.IncrementBy(-1);
 }
 
@@ -846,77 +954,10 @@ string LogBlockContainer::ToString() const {
   return s;
 }
 
-
-int64_t LogBlockContainer::GetAlignedBlockLength(int64_t block_offset,
-                                                 int64_t block_length) const {
-  uint64_t fs_block_size =
-      data_dir_->instance()->metadata()->filesystem_block_size_bytes();
-
-  // Nearly all blocks are placed on a filesystem block boundary, which means
-  // their length post-alignment is simply their length aligned up to the
-  // nearest fs block size.
-  //
-  // However, due to KUDU-1793, some blocks may start or end at misaligned
-  // offsets. We don't maintain enough state to precisely pinpoint such a
-  // block's (aligned) end offset in this case, so we'll just undercount it.
-  // This should be safe, although it may mean unreclaimed disk space (i.e.
-  // when GetAlignedBlockLength() is used in hole punching).
-  if (PREDICT_TRUE(block_offset % fs_block_size == 0)) {
-    return KUDU_ALIGN_UP(block_length, fs_block_size);
-  }
-  return block_length;
-}
-
 ////////////////////////////////////////////////////////////
-// LogBlock
+// LogBlock (definition)
 ////////////////////////////////////////////////////////////
 
-// The persistent metadata that describes a logical block.
-//
-// A block grows a LogBlock when its data has been synchronized with
-// the disk. That's when it's fully immutable (i.e. none of its metadata
-// can change), and when it becomes readable and persistent.
-//
-// LogBlocks are reference counted to simplify support for deletion with
-// outstanding readers. All refcount increments are performed with the
-// block manager lock held, as are deletion-based decrements. However,
-// no lock is held when ~LogReadableBlock decrements the refcount, thus it
-// must be made thread safe (by extending RefCountedThreadSafe instead of
-// the simpler RefCounted).
-class LogBlock : public RefCountedThreadSafe<LogBlock> {
- public:
-  LogBlock(LogBlockContainer* container, BlockId block_id, int64_t offset,
-           int64_t length);
-  ~LogBlock();
-
-  const BlockId& block_id() const { return block_id_; }
-  LogBlockContainer* container() const { return container_; }
-  int64_t offset() const { return offset_; }
-  int64_t length() const { return length_; }
-
-  // Delete the block. Actual deletion takes place when the
-  // block is destructed.
-  void Delete();
-
- private:
-  // The owning container. Must outlive the LogBlock.
-  LogBlockContainer* container_;
-
-  // The block identifier.
-  const BlockId block_id_;
-
-  // The block's offset in the container.
-  const int64_t offset_;
-
-  // The block's length.
-  const int64_t length_;
-
-  // Whether the block has been marked for deletion.
-  bool deleted_;
-
-  DISALLOW_COPY_AND_ASSIGN(LogBlock);
-};
-
 LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
                    int64_t offset, int64_t length)
     : container_(container),
@@ -931,24 +972,40 @@ LogBlock::LogBlock(LogBlockContainer* container, BlockId block_id,
 static void DeleteBlockAsync(LogBlockContainer* container,
                              BlockId block_id,
                              int64_t offset,
-                             int64_t length) {
-  // Use the block's aligned length so that the filesystem can reclaim maximal
-  // disk space.
-  //
+                             int64_t fs_aligned_length) {
   // We don't call SyncData() to synchronize the deletion because it's
   // expensive, and in the worst case, we'll just leave orphaned data
   // behind to be cleaned up in the next GC.
   VLOG(3) << "Freeing space belonging to block " << block_id;
-  WARN_NOT_OK(container->PunchHole(
-      offset, container->GetAlignedBlockLength(offset, length)),
+  WARN_NOT_OK(container->PunchHole(offset, fs_aligned_length),
               Substitute("Could not delete block $0", block_id.ToString()));
 }
 
 LogBlock::~LogBlock() {
   if (deleted_) {
+    // Use the block's aligned length so that the filesystem can reclaim
+    // maximal disk space.
     container_->ExecClosure(Bind(&DeleteBlockAsync, container_, block_id_,
-                                 offset_, length_));
+                                 offset_, fs_aligned_length()));
+  }
+}
+
+int64_t LogBlock::fs_aligned_length() const {
+  uint64_t fs_block_size = container_->instance()->filesystem_block_size_bytes();
+
+  // Nearly all blocks are placed on a filesystem block boundary, which means
+  // their length post-alignment is simply their length aligned up to the
+  // nearest fs block size.
+  //
+  // However, due to KUDU-1793, some blocks may start or end at misaligned
+  // offsets. We don't maintain enough state to precisely pinpoint such a
+  // block's (aligned) end offset in this case, so we'll just undercount it.
+  // This should be safe, although it may mean unreclaimed disk space (i.e.
+  // when fs_aligned_length() is used in hole punching).
+  if (PREDICT_TRUE(offset_ % fs_block_size == 0)) {
+    return KUDU_ALIGN_UP(length_, fs_block_size);
   }
+  return length_;
 }
 
 void LogBlock::Delete() {
@@ -1522,7 +1579,7 @@ Status LogBlockManager::DeleteBlock(const BlockId& block_id) {
   }
   VLOG(3) << "Deleting block " << block_id;
   lb->Delete();
-  lb->container()->BlockDeleted(lb->offset(), lb->length());
+  lb->container()->BlockDeleted(lb);
 
   // Record the on-disk deletion.
   //
@@ -1660,15 +1717,19 @@ bool LogBlockManager::TryUseBlockId(const BlockId& block_id) {
   return InsertIfNotPresent(&open_block_ids_, block_id);
 }
 
-bool LogBlockManager::AddLogBlock(LogBlockContainer* container,
-                                  const BlockId& block_id,
-                                  int64_t offset,
-                                  int64_t length) {
+scoped_refptr<LogBlock> LogBlockManager::AddLogBlock(
+    LogBlockContainer* container,
+    const BlockId& block_id,
+    int64_t offset,
+    int64_t length) {
   std::lock_guard<simple_spinlock> l(lock_);
   scoped_refptr<LogBlock> lb(new LogBlock(container, block_id, offset, length));
   mem_tracker_->Consume(kudu_malloc_usable_size(lb.get()));
 
-  return AddLogBlockUnlocked(lb);
+  if (AddLogBlockUnlocked(lb)) {
+    return lb;
+  }
+  return nullptr;
 }
 
 bool LogBlockManager::AddLogBlockUnlocked(const scoped_refptr<LogBlock>& lb) {
@@ -1763,16 +1824,8 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       return;
     }
 
-    // Populate the in-memory block maps using each container's records.
-    deque<BlockRecordPB> records;
-    s = container->ReadContainerRecords(&local_report, &records);
-    if (!s.ok()) {
-      *result_status = s.CloneAndPrepend(Substitute(
-          "Could not read records from container $0", container->ToString()));
-      return;
-    }
-
-    // Process the records, building a container-local map.
+    // Process the records, building a container-local map for live blocks and
+    // a list of dead blocks.
     //
     // It's important that we don't try to add these blocks to the global map
     // incrementally as we see each record, since it's possible that one container
@@ -1785,13 +1838,17 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // NOTE: Since KUDU-1538, we allocate sequential block IDs, which makes reuse
     // exceedingly unlikely. However, we might have old data which still exhibits
     // the above issue.
-    UntrackedBlockMap blocks_in_container;
-    vector<scoped_refptr<internal::LogBlock>> deleted;
+    UntrackedBlockMap live_blocks;
+    vector<scoped_refptr<internal::LogBlock>> dead_blocks;
     uint64_t max_block_id = 0;
-    for (const BlockRecordPB& r : records) {
-      ProcessBlockRecord(r, &local_report, container.get(),
-                         &blocks_in_container, &deleted);
-      max_block_id = std::max(max_block_id, r.block_id().id());
+    s = container->ProcessRecords(&local_report,
+                                  &live_blocks,
+                                  &dead_blocks,
+                                  &max_block_id);
+    if (!s.ok()) {
+      *result_status = s.CloneAndPrepend(Substitute(
+          "Could not process records in container $0", container->ToString()));
+      return;
     }
 
     // With deleted blocks out of the way, check for misaligned blocks.
@@ -1800,7 +1857,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
     // underlying filesystem's block size, an invariant maintained by the log
     // block manager. However, due to KUDU-1793, that invariant may have been
     // broken, so we'll note but otherwise allow it.
-    for (const auto& e : blocks_in_container) {
+    for (const auto& e : live_blocks) {
       if (PREDICT_FALSE(e.second->offset() %
                         container->instance()->filesystem_block_size_bytes() != 0)) {
         local_report.misaligned_block_check->entries.emplace_back(
@@ -1849,7 +1906,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
         local_report.full_container_space_check->entries.emplace_back(
             container->ToString(), reported_size - container->live_bytes_aligned());
         need_repunching.insert(need_repunching.end(),
-                               deleted.begin(), deleted.end());
+                               dead_blocks.begin(), dead_blocks.end());
       }
 
       local_report.stats.lbm_full_container_count++;
@@ -1869,7 +1926,7 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
       // memory in a local and add it to the mem-tracker in a single increment
       // at the end of this loop.
       int64_t mem_usage = 0;
-      for (const UntrackedBlockMap::value_type& e : blocks_in_container) {
+      for (const UntrackedBlockMap::value_type& e : live_blocks) {
         if (!AddLogBlockUnlocked(e.second)) {
           // TODO(adar): track as an inconsistency?
           LOG(FATAL) << "Found duplicate CREATE record for block " << e.first
@@ -1899,58 +1956,6 @@ void LogBlockManager::OpenDataDir(DataDir* dir,
   *result_status = Status::OK();
 }
 
-void LogBlockManager::ProcessBlockRecord(
-    const BlockRecordPB& record,
-    FsReport* report,
-    LogBlockContainer* container,
-    UntrackedBlockMap* block_map,
-    vector<scoped_refptr<internal::LogBlock>>* deleted) {
-  BlockId block_id(BlockId::FromPB(record.block_id()));
-  scoped_refptr<LogBlock> lb;
-  switch (record.op_type()) {
-    case CREATE: {
-      lb = new LogBlock(container, block_id, record.offset(), record.length());
-      if (!InsertIfNotPresent(block_map, block_id, lb)) {
-        // TODO(adar): treat as a different kind of inconsistency?
-        report->malformed_record_check->entries.emplace_back(
-            container->ToString(), record);
-        return;
-      }
-
-      VLOG(2) << Substitute("Found CREATE block $0 at offset $1 with length $2",
-                            block_id.ToString(),
-                            record.offset(), record.length());
-
-      // This block must be included in the container's logical size, even if
-      // it has since been deleted. This helps satisfy one of our invariants:
-      // once a container byte range has been used, it may never be reused in
-      // the future.
-      //
-      // If we ignored deleted blocks, we would end up reusing the space
-      // belonging to the last deleted block in the container.
-      container->BlockCreated(record.offset(), record.length());
-      break;
-    }
-    case DELETE:
-      lb = EraseKeyReturnValuePtr(block_map, block_id);
-      if (!lb) {
-        // TODO(adar): treat as a different kind of inconsistency?
-        report->malformed_record_check->entries.emplace_back(
-            container->ToString(), record);
-        return;
-      }
-      VLOG(2) << Substitute("Found DELETE block $0", block_id.ToString());
-      container->BlockDeleted(lb->offset(), lb->length());
-      deleted->emplace_back(std::move(lb));
-      break;
-    default:
-      // TODO(adar): treat as a different kind of inconsistency?
-      report->malformed_record_check->entries.emplace_back(
-          container->ToString(), record);
-      return;
-  }
-}
-
 Status LogBlockManager::Repair(
     FsReport* report,
     vector<scoped_refptr<internal::LogBlock>> need_repunching) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72e58eca/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index d69aeaa..e401446 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -238,15 +238,18 @@ class LogBlockManager : public BlockManager {
 
   // Adds a LogBlock to in-memory data structures.
   //
-  // Returns success if the LogBlock was successfully added, failure if it
-  // was already present.
-  bool AddLogBlock(internal::LogBlockContainer* container,
-                   const BlockId& block_id,
-                   int64_t offset,
-                   int64_t length);
+  // Returns the created LogBlock if it was successfully added or nullptr if a
+  // block with that ID was already present.
+  scoped_refptr<internal::LogBlock> AddLogBlock(
+      internal::LogBlockContainer* container,
+      const BlockId& block_id,
+      int64_t offset,
+      int64_t length);
 
   // Unlocked variant of AddLogBlock() for an already-constructed LogBlock object.
   // Must hold 'lock_'.
+  //
+  // Returns true if the LogBlock was successfully added, false if it was already present.
   bool AddLogBlockUnlocked(const scoped_refptr<internal::LogBlock>& lb);
 
   // Removes a LogBlock from in-memory data structures.
@@ -255,17 +258,6 @@ class LogBlockManager : public BlockManager {
   // already gone.
   scoped_refptr<internal::LogBlock> RemoveLogBlock(const BlockId& block_id);
 
-  // Parses a block record, adding or removing it in 'block_map', and
-  // accounting for it in the metadata for 'container'. Deleted blocks are
-  // written to 'deleted'.
-  //
-  // If any record is malformed, it is written to 'report'.
-  void ProcessBlockRecord(const BlockRecordPB& record,
-                          FsReport* report,
-                          internal::LogBlockContainer* container,
-                          UntrackedBlockMap* block_map,
-                          std::vector<scoped_refptr<internal::LogBlock>>* deleted);
-
   // Repairs any inconsistencies described in 'report'. Any blocks in
   // 'need_repunching' will be punched out again.
   //


[3/3] kudu git commit: [KUDU-754] add an environment variable for kudu client debugging to stderr

Posted by ad...@apache.org.
[KUDU-754] add an environment variable for kudu client debugging to stderr

Read environment variable "KUDU_CLIENT_VERBOSE" to get verbose level.
Calls SetVerboseLogLevel to set the specific verbose level.
Notes: The InitGoogleLoggingSafeBasic() already sets the debugging to stderr.

Change-Id: Iab5c7c24395c25184489200283dd38da024c07bb
Reviewed-on: http://gerrit.cloudera.org:8080/6736
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/867fc91c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/867fc91c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/867fc91c

Branch: refs/heads/master
Commit: 867fc91cb89def6a46ad1f8173e87ca6970cfa85
Parents: 72e58ec
Author: William Li <wi...@gmail.com>
Authored: Wed Apr 26 12:02:23 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue May 2 21:12:11 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-internal.h |  4 ++++
 src/kudu/client/client-test.cc    | 21 +++++++++++++++++++++
 src/kudu/client/client.cc         | 18 ++++++++++++++++++
 3 files changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/867fc91c/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index 864a734..c211639 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -256,6 +256,10 @@ Status RetryFunc(const MonoTime& deadline,
                  const std::string& timeout_msg,
                  const boost::function<Status(const MonoTime&, bool*)>& func);
 
+// Set logging verbose level through environment variable.
+void SetVerboseLevelFromEnvVar();
+extern const char* kVerboseEnvVar;
+
 } // namespace client
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/867fc91c/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index c4f41b2..a78fadc 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5231,5 +5231,26 @@ TEST_F(ClientTest, TestInvalidPartitionerBuilder) {
   ASSERT_EQ("Invalid argument: null table", s.ToString());
 }
 
+// Test that, log verbose level can be set through environment varialble
+// and it reflects to the FLAGS_v.
+TEST_F(ClientTest, TestVerboseLevelByEnvVar) {
+  FLAGS_v = 0;
+  setenv(kVerboseEnvVar, "5", 1); // 1 = overwrite if variable already exists.
+  SetVerboseLevelFromEnvVar();
+  ASSERT_EQ(5, FLAGS_v);
+
+  // negative values are to be ignored.
+  FLAGS_v = 0;
+  setenv(kVerboseEnvVar, "-1", 1);
+  SetVerboseLevelFromEnvVar();
+  ASSERT_EQ(0, FLAGS_v);
+
+  // non-parsable values are to be ignored.
+  FLAGS_v = 0;
+  setenv(kVerboseEnvVar, "abc", 1);
+  SetVerboseLevelFromEnvVar();
+  ASSERT_EQ(0, FLAGS_v);
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/867fc91c/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index e3e06dd..87d4e55 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -18,6 +18,7 @@
 #include "kudu/client/client.h"
 
 #include <algorithm>
+#include <cstdlib>
 #include <memory>
 #include <set>
 #include <string>
@@ -55,6 +56,7 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
@@ -126,6 +128,7 @@ using internal::MetaCache;
 using sp::shared_ptr;
 
 static const char* kProgName = "kudu_client";
+const char* kVerboseEnvVar = "KUDU_CLIENT_VERBOSE";
 
 // We need to reroute all logging to stderr when the client library is
 // loaded. GoogleOnceInit() can do that, but there are multiple entry
@@ -138,6 +141,21 @@ static const char* kProgName = "kudu_client";
 __attribute__((constructor))
 static void InitializeBasicLogging() {
   InitGoogleLoggingSafeBasic(kProgName);
+
+  SetVerboseLevelFromEnvVar();
+}
+
+// Set Client logging verbose level from environment variable.
+void SetVerboseLevelFromEnvVar() {
+  int32_t level = 0; // this is the default logging level;
+  const char* env_verbose_level = std::getenv(kVerboseEnvVar);
+  if (env_verbose_level != nullptr) {
+     if (safe_strto32(env_verbose_level, &level) && (level >= 0)) {
+       SetVerboseLogLevel(level);
+     } else {
+       LOG(WARNING) << "Invalid verbose level from environment variable " << kVerboseEnvVar;
+     }
+  }
 }
 
 // Adapts between the internal LogSeverity and the client's KuduLogSeverity.