You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/12/08 22:24:51 UTC

kudu git commit: KUDU-1508: enforce block limit on lbm containers

Repository: kudu
Updated Branches:
  refs/heads/master 2c43f9596 -> 65695a303


KUDU-1508: enforce block limit on lbm containers

This patch introduces the following machinery:
1. LBM containers can now be limited to a certain number of blocks using
   log_container_max_blocks. This is a soft limit; if the limit is reduced
   and the server restarted, existing containers may exceed it.
   The default value means "no limit except when vulnerable to KUDU-1508".
2. When we construct a block manager, we check whether it's an el6 kernel
   (that is, whether it's vulnerable to KUDU-1508).
3. At data directory opening time, we figure out whether it's vulnerable to
   KUDU-1508. If it is, and if it's running on an ext4 filesystem, we
   prescribe a particular block limit that depends on the filesystem's
   block size. These limits were arrived at via experimentation (see commit
   4923a74). They are likely more conservative than they need to be (they
   assume one extent per filesystem block with every other extent punched
   out), but with the addition of the file cache, the scalability overhead
   should be minimal.
4. The prescribed limit is passed into each container as it is instantiated.

I ran TestContainerWithManyHoles on several loopback ext4 filesystems on
el6.6. I tested block sizes of 1024, 2048, and 4096, each with and without
the fix. Without the fix, the filesystems needed to be repaired 100% of the
time. With the fix, they never needed repair.

Change-Id: I63576d2bf2cc61b2deb70f7e166f08e0d4c66543
Reviewed-on: http://gerrit.cloudera.org:8080/5403
Tested-by: Kudu Jenkins
Reviewed-by: Dan Burkert <da...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/65695a30
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/65695a30
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/65695a30

Branch: refs/heads/master
Commit: 65695a30345913d69fec7ef777d990e549719e09
Parents: 2c43f95
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Dec 5 17:05:08 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Thu Dec 8 22:24:26 2016 +0000

----------------------------------------------------------------------
 src/kudu/experiments/KUDU-1508/run_test.sh |   5 +-
 src/kudu/fs/block_manager-test.cc          | 152 ++++++++++++++++++++++++
 src/kudu/fs/log_block_manager.cc           | 111 +++++++++++++++--
 src/kudu/fs/log_block_manager.h            |  28 ++++-
 src/kudu/util/env.h                        |   9 ++
 src/kudu/util/env_posix.cc                 |  28 +++++
 6 files changed, 322 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/experiments/KUDU-1508/run_test.sh
----------------------------------------------------------------------
diff --git a/src/kudu/experiments/KUDU-1508/run_test.sh b/src/kudu/experiments/KUDU-1508/run_test.sh
index df77759..a416513 100755
--- a/src/kudu/experiments/KUDU-1508/run_test.sh
+++ b/src/kudu/experiments/KUDU-1508/run_test.sh
@@ -73,8 +73,11 @@ function run_test() {
 
   # Create the test filesystem and file.
   #
+  # Note that 'fallocate --length <bytes>' doesn't appear to be recognized on
+  # el6.6, despite what the help says.
+  #
   # The 'sync' at the end speeds up subsequent hole punching.
-  run "fallocate --length $FILESYSTEM_SIZE $BLOCK_DEVICE_FILE"
+  run "fallocate -l $FILESYSTEM_SIZE $BLOCK_DEVICE_FILE"
   run "mkfs -F -t ext4 -b $BLOCK_SIZE $BLOCK_DEVICE_FILE"
   run "sudo mount -o loop $BLOCK_DEVICE_FILE $MOUNT_DIR"
   run "sudo chown $EUID $MOUNT_DIR"

http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index 74ff728..4e7cf6a 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <memory>
+#include <unordered_map>
 #include <unordered_set>
 #include <string>
 #include <vector>
@@ -42,6 +43,7 @@ using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
+using std::unordered_map;
 using std::unordered_set;
 using std::vector;
 using strings::Substitute;
@@ -53,6 +55,7 @@ DECLARE_uint64(log_container_max_size);
 
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
+DECLARE_int64(log_container_max_blocks);
 
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
 
@@ -143,6 +146,14 @@ class BlockManagerTest : public KuduTest {
     *data_file = container_data_filename;
   }
 
+  void AssertNumContainers(int expected_num_containers) {
+    // The expected directory contents are dot, dotdot, test metadata, instance
+    // file, and a file pair per container.
+    vector<string> children;
+    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
+    ASSERT_EQ(4 + (2 * expected_num_containers), children.size());
+  }
+
   void RunMultipathTest(const vector<string>& paths);
 
   void RunLogMetricsTest();
@@ -1233,5 +1244,146 @@ TYPED_TEST(BlockManagerTest, TestMetadataOkayDespiteFailedWrites) {
   }
 }
 
+TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
+  // This is a regression test of sorts for KUDU-1508, though it doesn't
+  // actually fail if the fix is missing; it just corrupts the filesystem.
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
+  static unordered_map<int, int> block_size_to_last_interior_node_block_number =
+     {{1024, 168},
+      {2048, 338},
+      {4096, 680}};
+
+  const int kNumBlocks = 16 * 1024;
+
+  uint64_t fs_block_size;
+  ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size));
+  if (!ContainsKey(block_size_to_last_interior_node_block_number,
+                   fs_block_size)) {
+    LOG(INFO) << Substitute("Filesystem block size is $0, skipping test",
+                            fs_block_size);
+    return;
+  }
+  int last_interior_node_block_number = FindOrDie(
+      block_size_to_last_interior_node_block_number, fs_block_size);
+
+  ASSERT_GE(kNumBlocks, last_interior_node_block_number);
+
+  // Speed up the test.
+  FLAGS_never_fsync = true;
+
+  // Create a bunch of blocks. They should all go in one container (unless
+  // the container becomes full).
+  LOG(INFO) << Substitute("Creating $0 blocks", kNumBlocks);
+  vector<BlockId> ids;
+  for (int i = 0; i < kNumBlocks; i++) {
+    gscoped_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(block->Append("aaaa"));
+    ASSERT_OK(block->Close());
+    ids.push_back(block->id());
+  }
+
+  // Delete every other block. In effect, this maximizes the number of extents
+  // in the container by forcing the filesystem to alternate every hole with
+  // a live extent.
+  LOG(INFO) << "Deleting every other block";
+  for (int i = 0; i < ids.size(); i += 2) {
+    ASSERT_OK(bm_->DeleteBlock(ids[i]));
+  }
+
+  // Delete all of the blocks belonging to the interior node. If KUDU-1508
+  // applies, this should corrupt the filesystem.
+  LOG(INFO) << Substitute("Deleting remaining blocks up to block number $0",
+                          last_interior_node_block_number);
+  for (int i = 1; i < last_interior_node_block_number; i += 2) {
+    ASSERT_OK(bm_->DeleteBlock(ids[i]));
+  }
+}
+
+TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64"));
+
+  // no el6 infix
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32"));
+
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.33-1.0.0.el6.x86_64"));
+
+  // Make sure it's a numeric sort, not a lexicographic one.
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1000.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.100-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.10.0-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("10.0.0-1.0.0.el6.x86_64"));
+
+  // Kernel from el6.6: buggy
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-504.30.3.el6.x86_64"));
+
+  // Kernel from el6.9: not buggy.
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-674.0.0.el6.x86_64"));
+}
+
+TEST_F(LogBlockManagerTest, TestLookupBlockLimit) {
+  int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024);
+  int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048);
+  int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096);
+
+  // Test the floor behavior in LookupBlockLimit().
+  for (int i = 0; i < 16384; i++) {
+    if (i < 2048) {
+      ASSERT_EQ(limit_1024, LogBlockManager::LookupBlockLimit(i));
+    } else if (i < 4096) {
+      ASSERT_EQ(limit_2048, LogBlockManager::LookupBlockLimit(i));
+    } else {
+      ASSERT_EQ(limit_4096, LogBlockManager::LookupBlockLimit(i));
+    }
+  }
+}
+
+TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
+  RETURN_NOT_LOG_BLOCK_MANAGER();
+
+  const int kNumBlocks = 1000;
+
+  // Speed up the test.
+  FLAGS_never_fsync = true;
+
+  // Creates 'kNumBlocks' blocks with minimal data.
+  auto create_some_blocks = [&]() -> Status {
+    for (int i = 0; i < kNumBlocks; i++) {
+      gscoped_ptr<WritableBlock> block;
+      RETURN_NOT_OK(bm_->CreateBlock(&block));
+      RETURN_NOT_OK(block->Append("aaaa"));
+      RETURN_NOT_OK(block->Close());
+    }
+    return Status::OK();
+  };
+
+  // All of these blocks should fit into one container.
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(1));
+
+  // With a limit imposed, the existing container is immediately full, and we
+  // need a few more to satisfy another 'kNumBlocks' blocks.
+  FLAGS_log_container_max_blocks = 400;
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(4));
+
+  // Now remove the limit and create more blocks. They should go into existing
+  // containers, which are now no longer full.
+  FLAGS_log_container_max_blocks = -1;
+  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
+                                     shared_ptr<MemTracker>(),
+                                     { GetTestDataDirectory() },
+                                     false));
+
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(4));
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/fs/log_block_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index 129d9cc..1afa93b 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -18,6 +18,7 @@
 #include "kudu/fs/log_block_manager.h"
 
 #include <algorithm>
+#include <map>
 #include <memory>
 #include <mutex>
 
@@ -28,6 +29,7 @@
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/strip.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -59,6 +61,12 @@ DEFINE_uint64(log_container_max_size, 10LU * 1024 * 1024 * 1024,
               "Maximum size (soft) of a log container");
 TAG_FLAG(log_container_max_size, advanced);
 
+DEFINE_int64(log_container_max_blocks, -1,
+             "Maximum number of blocks (soft) of a log container. Use 0 for "
+             "no limit. Use -1 for no limit except in the case of a kernel "
+             "bug with hole punching on ext4 (see KUDU-1508 for details).");
+TAG_FLAG(log_container_max_blocks, advanced);
+
 DEFINE_uint64(log_container_preallocate_bytes, 32LU * 1024 * 1024,
               "Number of bytes to preallocate in a log container when "
               "creating new blocks. Set to 0 to disable preallocation");
@@ -103,6 +111,7 @@ using internal::LogBlock;
 using internal::LogBlockContainer;
 using pb_util::ReadablePBContainerFile;
 using pb_util::WritablePBContainerFile;
+using std::map;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
@@ -258,12 +267,13 @@ class LogBlockContainer {
   // returning the records.
   Status ReadContainerRecords(deque<BlockRecordPB>* records) const;
 
-  // Updates 'total_bytes_written_', marking this container as full if
-  // needed. Should only be called when a block is fully written, as it
-  // will round up the container data file's position.
+  // Updates 'total_bytes_written_' and 'total_blocks_written_', marking this
+  // container as full if needed. Should only be called when a block is fully
+  // written, as it will round up the container data file's position.
   //
   // This function is thread unsafe.
-  void UpdateBytesWritten(int64_t block_offset, size_t block_length);
+  void UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                        size_t block_length);
 
   // Run a task on this container's data directory thread pool.
   //
@@ -278,7 +288,8 @@ class LogBlockContainer {
   LogBlockManager* block_manager() const { return block_manager_; }
   int64_t total_bytes_written() const { return total_bytes_written_; }
   bool full() const {
-    return total_bytes_written_ >= FLAGS_log_container_max_size;
+    return total_bytes_written_ >= FLAGS_log_container_max_size ||
+        (max_num_blocks_ && (total_blocks_written_ >= max_num_blocks_));
   }
   const LogBlockManagerMetrics* metrics() const { return metrics_; }
   const DataDir* data_dir() const { return data_dir_; }
@@ -301,6 +312,8 @@ class LogBlockContainer {
   // The data directory where the container lives.
   DataDir* data_dir_;
 
+  const boost::optional<int64_t> max_num_blocks_;
+
   // Offset up to which we have preallocated bytes.
   int64_t preallocated_offset_ = 0;
 
@@ -311,6 +324,9 @@ class LogBlockContainer {
   // The amount of data written thus far in the container.
   int64_t total_bytes_written_ = 0;
 
+  // The number of blocks written thus far in the container.
+  int64_t total_blocks_written_ = 0;
+
   // The metrics. Not owned by the log container; it has the same lifespan
   // as the block manager.
   const LogBlockManagerMetrics* metrics_;
@@ -325,6 +341,8 @@ LogBlockContainer::LogBlockContainer(
     shared_ptr<RWFile> data_file)
     : block_manager_(block_manager),
       data_dir_(data_dir),
+      max_num_blocks_(FindOrDie(block_manager->block_limits_by_data_dir_,
+                                data_dir)),
       metadata_file_(std::move(metadata_file)),
       data_file_(std::move(data_file)),
       metrics_(block_manager->metrics()) {
@@ -624,7 +642,8 @@ Status LogBlockContainer::FinishBlock(const Status& s, WritableBlock* block) {
   CHECK(block_manager()->AddLogBlock(this, block->id(),
                                      total_bytes_written_,
                                      block->BytesAppended()));
-  UpdateBytesWritten(total_bytes_written_, block->BytesAppended());
+  UpdateBytesWrittenAndTotalBlocks(total_bytes_written_,
+                                   block->BytesAppended());
 
   // Truncate the container if it's now full; any left over preallocated space
   // is no longer needed.
@@ -736,7 +755,8 @@ Status LogBlockContainer::EnsurePreallocated(int64_t block_start_offset,
   return Status::OK();
 }
 
-void LogBlockContainer::UpdateBytesWritten(int64_t block_offset, size_t block_length) {
+void LogBlockContainer::UpdateBytesWrittenAndTotalBlocks(int64_t block_offset,
+                                                         size_t block_length) {
   DCHECK_GE(block_offset, 0);
 
   // The log block manager maintains block contiguity as an invariant, which
@@ -758,6 +778,8 @@ void LogBlockContainer::UpdateBytesWritten(int64_t block_offset, size_t block_le
   }
   total_bytes_written_ = std::max(total_bytes_written_, new_total_bytes);
 
+  total_blocks_written_++;
+
   if (full()) {
     VLOG(1) << Substitute(
         "Container $0 with size $1 is now full, max size is $2",
@@ -1199,6 +1221,13 @@ const char* LogBlockManager::kContainerDataFileSuffix = ".data";
 
 static const char* kBlockManagerType = "log";
 
+// These values were arrived at via experimentation. See commit 4923a74 for
+// more details.
+const map<int64_t, int64_t> LogBlockManager::per_fs_block_size_block_limits({
+  { 1024, 673 },
+  { 2048, 1353 },
+  { 4096, 2721 }});
+
 LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
   : mem_tracker_(MemTracker::CreateTracker(-1,
                                            "log_block_manager",
@@ -1210,6 +1239,7 @@ LogBlockManager::LogBlockManager(Env* env, const BlockManagerOptions& opts)
                         BlockAllocator(mem_tracker_)),
     env_(DCHECK_NOTNULL(env)),
     read_only_(opts.read_only),
+    buggy_el6_kernel_(IsBuggyEl6Kernel(env->GetKernelRelease())),
     next_block_id_(1) {
 
   int64_t file_cache_capacity = GetFileCacheCapacityForBlockManager(env_);
@@ -1278,6 +1308,48 @@ Status LogBlockManager::Open() {
     RETURN_NOT_OK(file_cache_->Init());
   }
 
+  // Establish (and log) block limits for each data directory using kernel,
+  // filesystem, and gflags information.
+  for (const auto& dd : dd_manager_.data_dirs()) {
+    boost::optional<int64_t> limit;
+
+    if (FLAGS_log_container_max_blocks == -1) {
+      // No limit, unless this is KUDU-1508.
+
+      // The log block manager requires hole punching and, of the ext
+      // filesystems, only ext4 supports it. Thus, if this is an ext
+      // filesystem, it's ext4 by definition.
+      bool is_on_ext4;
+      RETURN_NOT_OK(env_->IsOnExtFilesystem(dd->dir(), &is_on_ext4));
+      if (buggy_el6_kernel_ && is_on_ext4) {
+        uint64_t fs_block_size =
+            dd->instance()->metadata()->filesystem_block_size_bytes();
+        bool untested_block_size =
+            !ContainsKey(per_fs_block_size_block_limits, fs_block_size);
+        string msg = Substitute(
+            "Data dir $0 is on an ext4 filesystem vulnerable to KUDU-1508 "
+            "with $1block size $2", dd->dir(),
+            untested_block_size ? "untested " : "", fs_block_size);
+        if (untested_block_size) {
+          LOG(WARNING) << msg;
+        } else {
+          LOG(INFO) << msg;
+        }
+        limit = LookupBlockLimit(fs_block_size);
+      }
+    } else if (FLAGS_log_container_max_blocks > 0) {
+      // Use the provided limit.
+      limit = FLAGS_log_container_max_blocks;
+    }
+
+    if (limit) {
+      LOG(INFO) << Substitute(
+          "Limiting containers on data directory $0 to $1 blocks",
+          dd->dir(), *limit);
+    }
+    InsertOrDie(&block_limits_by_data_dir_, dd.get(), limit);
+  }
+
   vector<Status> statuses(dd_manager_.data_dirs().size());
   int i = 0;
   for (const auto& dd : dd_manager_.data_dirs()) {
@@ -1675,7 +1747,8 @@ Status LogBlockManager::ProcessBlockRecord(const BlockRecordPB& record,
       //
       // If we ignored deleted blocks, we would end up reusing the space
       // belonging to the last deleted block in the container.
-      container->UpdateBytesWritten(record.offset(), record.length());
+      container->UpdateBytesWrittenAndTotalBlocks(record.offset(),
+                                                  record.length());
       break;
     }
     case DELETE:
@@ -1699,5 +1772,27 @@ std::string LogBlockManager::ContainerPathForTests(internal::LogBlockContainer*
   return container->ToString();
 }
 
+bool LogBlockManager::IsBuggyEl6Kernel(const string& kernel_release) {
+  // Any kernel older than 2.6.32-674 (el6.9) is buggy.
+  //
+  // TODO(adar): need to update this when the fix is backported to el6.8.z. See
+  // https://bugzilla.redhat.com/show_bug.cgi?id=1397808.
+  autodigit_less lt;
+  return kernel_release.find("el6") != string::npos &&
+      lt(kernel_release, "2.6.32-674");
+}
+
+int64_t LogBlockManager::LookupBlockLimit(int64_t fs_block_size) {
+  const int64_t* limit = FindFloorOrNull(per_fs_block_size_block_limits,
+                                         fs_block_size);
+  if (limit) {
+    return *limit;
+  }
+
+  // Block size must have been less than the very first key. Return the
+  // first recorded entry and hope for the best.
+  return per_fs_block_size_block_limits.begin()->second;
+}
+
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/fs/log_block_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager.h b/src/kudu/fs/log_block_manager.h
index aaa983f..97501f3 100644
--- a/src/kudu/fs/log_block_manager.h
+++ b/src/kudu/fs/log_block_manager.h
@@ -19,6 +19,7 @@
 #define KUDU_FS_LOG_BLOCK_MANAGER_H
 
 #include <deque>
+#include <map>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -26,6 +27,7 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/fs/block_id.h"
@@ -182,8 +184,11 @@ class LogBlockManager : public BlockManager {
   int64_t CountBlocksForTests() const;
 
  private:
-  FRIEND_TEST(LogBlockManagerTest, TestReuseBlockIds);
+  FRIEND_TEST(LogBlockManagerTest, TestLookupBlockLimit);
   FRIEND_TEST(LogBlockManagerTest, TestMetadataTruncation);
+  FRIEND_TEST(LogBlockManagerTest, TestParseKernelRelease);
+  FRIEND_TEST(LogBlockManagerTest, TestReuseBlockIds);
+
   friend class internal::LogBlockContainer;
 
   // Simpler typedef for a block map which isn't tracked in the memory tracker.
@@ -268,11 +273,22 @@ class LogBlockManager : public BlockManager {
 
   Env* env() const { return env_; }
 
-  // Return the path of the given container. Only for use by tests.
+  // Returns the path of the given container. Only for use by tests.
   static std::string ContainerPathForTests(internal::LogBlockContainer* container);
 
+  // Returns whether the given kernel release is vulnerable to KUDU-1508.
+  static bool IsBuggyEl6Kernel(const std::string& kernel_release);
+
+  // Finds an appropriate block limit from 'per_fs_block_size_block_limits'
+  // using the given filesystem block size.
+  static int64_t LookupBlockLimit(int64_t fs_block_size);
+
   const internal::LogBlockManagerMetrics* metrics() const { return metrics_.get(); }
 
+  // For kernels affected by KUDU-1508, tracks a known good upper bound on the
+  // number of blocks per container, given a particular filesystem block size.
+  static const std::map<int64_t, int64_t> per_fs_block_size_block_limits;
+
   // Tracks memory consumption of any allocations numerous enough to be
   // interesting (e.g. LogBlocks).
   std::shared_ptr<MemTracker> mem_tracker_;
@@ -283,6 +299,11 @@ class LogBlockManager : public BlockManager {
   // Manages and owns all of the block manager's data directories.
   DataDirManager dd_manager_;
 
+  // Maps a data directory to an upper bound on the number of blocks that a
+  // container residing in that directory should observe, if one is necessary.
+  std::unordered_map<const DataDir*,
+                     boost::optional<int64_t>> block_limits_by_data_dir_;
+
   // Manages files opened for reading.
   std::unique_ptr<FileCache<RWFile>> file_cache_;
 
@@ -319,6 +340,9 @@ class LogBlockManager : public BlockManager {
   // If true, only read operations are allowed.
   const bool read_only_;
 
+  // If true, the kernel is vulnerable to KUDU-1508.
+  const bool buggy_el6_kernel_;
+
   // For generating container names.
   ObjectIdGenerator oid_generator_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/util/env.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/env.h b/src/kudu/util/env.h
index a9b8751..64395af 100644
--- a/src/kudu/util/env.h
+++ b/src/kudu/util/env.h
@@ -296,6 +296,15 @@ class Env {
   // kernel) to be equal to the hard limit.
   virtual void IncreaseOpenFileLimit() = 0;
 
+  // Checks whether the given path resides on an ext2, ext3, or ext4
+  // filesystem.
+  //
+  // On success, 'result' contains the answer. On failure, 'result' is unset.
+  virtual Status IsOnExtFilesystem(const std::string& path, bool* result) = 0;
+
+  // Gets the kernel release string for this machine.
+  virtual std::string GetKernelRelease() = 0;
+
   // Special string injected into file-growing operations' random failures
   // (if enabled).
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/65695a30/src/kudu/util/env_posix.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/env_posix.cc b/src/kudu/util/env_posix.cc
index 715207f..3f0be74 100644
--- a/src/kudu/util/env_posix.cc
+++ b/src/kudu/util/env_posix.cc
@@ -16,6 +16,7 @@
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/uio.h>
+#include <sys/utsname.h>
 #include <unistd.h>
 
 #include <cstdio>
@@ -55,7 +56,9 @@
 #include <sys/sysctl.h>
 #else
 #include <linux/falloc.h>
+#include <linux/magic.h>
 #include <sys/sysinfo.h>
+#include <sys/vfs.h>
 #endif  // defined(__APPLE__)
 
 // Copied from falloc.h. Useful for older kernels that lack support for
@@ -1267,6 +1270,31 @@ class PosixEnv : public Env {
     }
   }
 
+  virtual Status IsOnExtFilesystem(const string& path, bool* result) OVERRIDE {
+    TRACE_EVENT0("io", "PosixEnv::IsOnExtFilesystem");
+    ThreadRestrictions::AssertIOAllowed();
+
+#ifdef __APPLE__
+    *result = false;
+#else
+    struct statfs buf;
+    int ret;
+    RETRY_ON_EINTR(ret, statfs(path.c_str(), &buf));
+    if (ret == -1) {
+      return IOError(Substitute("statfs: $0", path), errno);
+    }
+    *result = (buf.f_type == EXT4_SUPER_MAGIC);
+#endif
+    return Status::OK();
+  }
+
+  virtual string GetKernelRelease() OVERRIDE {
+    // There's no reason for this to ever fail.
+    struct utsname u;
+    PCHECK(uname(&u) == 0);
+    return string(u.release);
+  }
+
  private:
   // unique_ptr Deleter implementation for fts_close
   struct FtsCloser {