You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/10 21:41:51 UTC

[kudu] branch master updated (eaed5fb -> 1b3b26d)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from eaed5fb  replace gscoped_ptr with std::unique_ptr
     new cf555ce  [util] Add support for 32 & 64 byte alignment to Arena allocator
     new 885d794  [util] Import "Or" function to BlockBloomFilter from Impala
     new 1b3b26d  KUDU-3070 skip open block manager

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/fs/fs_manager.cc                   |  20 +++--
 src/kudu/fs/fs_manager.h                    |   7 ++
 src/kudu/tools/tool_action_local_replica.cc |   4 +-
 src/kudu/util/block_bloom_filter-test.cc    |  49 +++++++++++
 src/kudu/util/block_bloom_filter.cc         |  63 +++++++++++++-
 src/kudu/util/block_bloom_filter.h          |  28 ++++++-
 src/kudu/util/block_bloom_filter_avx2.cc    |  20 ++++-
 src/kudu/util/memory/arena-test.cc          | 123 ++++++++++++++++++++++------
 src/kudu/util/memory/arena.cc               |  31 +++++--
 src/kudu/util/memory/arena.h                |  42 +++++++---
 10 files changed, 324 insertions(+), 63 deletions(-)


[kudu] 03/03: KUDU-3070 skip open block manager

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 1b3b26d957e2944ff3197eea4ee99dec22640721
Author: wangning <19...@gmail.com>
AuthorDate: Fri Mar 6 21:08:28 2020 +0800

    KUDU-3070 skip open block manager
    
    In some cli ops (e.g. local_replica cmeta rewrite_raft_config), open
    block manager is not necessary.
    
    Change-Id: Ifaec03512086430a6270c458269da1cf996fd9c5
    Reviewed-on: http://gerrit.cloudera.org:8080/15380
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/fs_manager.cc                   | 20 +++++++++++++-------
 src/kudu/fs/fs_manager.h                    |  7 +++++++
 src/kudu/tools/tool_action_local_replica.cc |  4 +++-
 3 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/src/kudu/fs/fs_manager.cc b/src/kudu/fs/fs_manager.cc
index a60b922..127f0ba 100644
--- a/src/kudu/fs/fs_manager.cc
+++ b/src/kudu/fs/fs_manager.cc
@@ -141,7 +141,8 @@ FsManagerOpts::FsManagerOpts()
     block_manager_type(FLAGS_block_manager),
     read_only(false),
     update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
-    file_cache(nullptr) {
+    file_cache(nullptr),
+    skip_block_manager(false) {
   data_roots = strings::Split(FLAGS_fs_data_dirs, ",", strings::SkipEmpty());
 }
 
@@ -151,7 +152,8 @@ FsManagerOpts::FsManagerOpts(const string& root)
     block_manager_type(FLAGS_block_manager),
     read_only(false),
     update_instances(UpdateInstanceBehavior::UPDATE_AND_IGNORE_FAILURES),
-    file_cache(nullptr) {}
+    file_cache(nullptr),
+    skip_block_manager(false) {}
 
 FsManager::FsManager(Env* env, FsManagerOpts opts)
   : env_(DCHECK_NOTNULL(env)),
@@ -425,10 +427,12 @@ Status FsManager::Open(FsReport* report) {
   error_manager_->SetErrorNotificationCb(ErrorHandlerType::DISK_ERROR,
       Bind(&DataDirManager::MarkDirFailedByUuid, Unretained(dd_manager_.get())));
 
-  // Finally, initialize and open the block manager.
-  InitBlockManager();
-  LOG_TIMING(INFO, "opening block manager") {
-    RETURN_NOT_OK(block_manager_->Open(report));
+  // Finally, initialize and open the block manager if needed.
+  if (!opts_.skip_block_manager) {
+    InitBlockManager();
+    LOG_TIMING(INFO, "opening block manager") {
+      RETURN_NOT_OK(block_manager_->Open(report));
+    }
   }
 
   // Report wal and metadata directories.
@@ -763,15 +767,17 @@ void FsManager::DumpFileSystemTree(ostream& out, const string& prefix,
 
 Status FsManager::CreateNewBlock(const CreateBlockOptions& opts, unique_ptr<WritableBlock>* block) {
   CHECK(!opts_.read_only);
-
+  DCHECK(block_manager_);
   return block_manager_->CreateBlock(opts, block);
 }
 
 Status FsManager::OpenBlock(const BlockId& block_id, unique_ptr<ReadableBlock>* block) {
+  DCHECK(block_manager_);
   return block_manager_->OpenBlock(block_id, block);
 }
 
 bool FsManager::BlockExists(const BlockId& block_id) const {
+  DCHECK(block_manager_);
   unique_ptr<ReadableBlock> block;
   return block_manager_->OpenBlock(block_id, &block).ok();
 }
diff --git a/src/kudu/fs/fs_manager.h b/src/kudu/fs/fs_manager.h
index c719be7..7577d06 100644
--- a/src/kudu/fs/fs_manager.h
+++ b/src/kudu/fs/fs_manager.h
@@ -130,6 +130,12 @@ struct FsManagerOpts {
   //
   // Defaults to null.
   FileCache* file_cache;
+
+  // Whether or not to skip opening the block manager. FsManager operations that
+  // require the block manager will crash.
+  //
+  // Default to false.
+  bool skip_block_manager;
 };
 
 // FsManager provides helpers to read data and metadata files,
@@ -286,6 +292,7 @@ class FsManager {
   }
 
   fs::BlockManager* block_manager() {
+    DCHECK(block_manager_);
     return block_manager_.get();
   }
 
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 36b1005..07858e2 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -284,7 +284,9 @@ Status RewriteRaftConfig(const RunnerContext& context) {
 
   // Make a copy of the old file before rewriting it.
   Env* env = Env::Default();
-  FsManager fs_manager(env, FsManagerOpts());
+  FsManagerOpts fs_opts = FsManagerOpts();
+  fs_opts.skip_block_manager = true;
+  FsManager fs_manager(env, std::move(fs_opts));
   RETURN_NOT_OK(fs_manager.Open());
   RETURN_NOT_OK(BackupConsensusMetadata(&fs_manager, tablet_id));
 


[kudu] 02/03: [util] Import "Or" function to BlockBloomFilter from Impala

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 885d7946c771ea875919d479c67b2ddac5c5103e
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Tue Mar 3 16:18:17 2020 -0800

    [util] Import "Or" function to BlockBloomFilter from Impala
    
    Impala will be switching to using the Block Bloom filter from kudu-util.
    "Or" function was missing and this change adds it.
    
    Note that original implementation for OrEqualArrayAvx() in Impala is
    targeted for AVX and not AVX2, however AVX2 is super-set of AVX instructions
    and there is already provision in the Block Bloom filter to separate out
    AVX2 v/s non-AVX2 (SSE) code. Hence don't see need to add separate AVX
    specific file/implementation.
    
    Change-Id: Ibe5f9311f73dcff883dd2cce18fd558e7d57d14f
    Reviewed-on: http://gerrit.cloudera.org:8080/15373
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/util/block_bloom_filter-test.cc | 33 +++++++++++++++++++
 src/kudu/util/block_bloom_filter.cc      | 56 ++++++++++++++++++++++++++++++++
 src/kudu/util/block_bloom_filter.h       | 28 ++++++++++++++--
 src/kudu/util/block_bloom_filter_avx2.cc | 20 +++++++++++-
 4 files changed, 134 insertions(+), 3 deletions(-)

diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc
index 4c4d418..b9ed9f2 100644
--- a/src/kudu/util/block_bloom_filter-test.cc
+++ b/src/kudu/util/block_bloom_filter-test.cc
@@ -286,4 +286,37 @@ TEST_F(BlockBloomFilterTest, MinSpaceForFpp) {
     }
   }
 }
+
+TEST_F(BlockBloomFilterTest, Or) {
+  BlockBloomFilter* bf1 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
+  BlockBloomFilter* bf2 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
+
+  for (int i = 60; i < 80; ++i) bf2->Insert(i);
+  for (int i = 0; i < 10; ++i) bf1->Insert(i);
+
+  ASSERT_OK(bf1->Or(*bf2));
+  for (int i = 0; i < 10; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
+  for (int i = 60; i < 80; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
+
+  // Insert another value to aggregated BloomFilter.
+  for (int i = 11; i < 50; ++i) bf1->Insert(i);
+
+  for (int i = 11; i < 50; ++i) ASSERT_TRUE(bf1->Find(i)) << i;
+  ASSERT_FALSE(bf1->Find(81));
+
+  // Check that AlwaysFalse() is updated correctly.
+  BlockBloomFilter* bf3 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
+  BlockBloomFilter* always_false = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
+  ASSERT_OK(bf3->Or(*always_false));
+  EXPECT_TRUE(bf3->AlwaysFalse());
+  ASSERT_OK(bf3->Or(*bf2));
+  EXPECT_FALSE(bf3->AlwaysFalse());
+
+  // Invalid argument test cases.
+  BlockBloomFilter* bf4 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100, 0.01));
+  BlockBloomFilter* bf5 = CreateBloomFilter(BlockBloomFilter::MinLogSpace(100000, 0.01));
+  Status s = bf4->Or(*bf5);
+  ASSERT_TRUE(s.IsInvalidArgument());
+  ASSERT_STR_CONTAINS(s.ToString(), "Directory size don't match");
+}
 }  // namespace kudu
diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc
index e09a7cc..acaf358 100644
--- a/src/kudu/util/block_bloom_filter.cc
+++ b/src/kudu/util/block_bloom_filter.cc
@@ -47,6 +47,9 @@ namespace kudu {
 
 constexpr uint32_t BlockBloomFilter::kRehash[8] __attribute__((aligned(32)));
 const base::CPU BlockBloomFilter::kCpu = base::CPU();
+// constexpr data member requires initialization in the class declaration.
+// Hence no duplicate initialization in the definition here.
+constexpr BlockBloomFilter* const BlockBloomFilter::kAlwaysTrueFilter;
 
 BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_allocator) :
   always_false_(true),
@@ -60,13 +63,16 @@ BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_all
   if (has_avx2()) {
     bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsertAVX2;
     bucket_find_func_ptr_ = &BlockBloomFilter::BucketFindAVX2;
+    or_equal_array_func_ptr_ = &BlockBloomFilter::OrEqualArrayAVX2;
   } else {
     bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsert;
     bucket_find_func_ptr_ = &BlockBloomFilter::BucketFind;
+    or_equal_array_func_ptr_ = &BlockBloomFilter::OrEqualArray;
   }
 #else
   bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsert;
   bucket_find_func_ptr_ = &BlockBloomFilter::BucketFind;
+  or_equal_array_func_ptr_ = &BlockBloomFilter::OrEqualArray;
 #endif
 }
 
@@ -259,6 +265,56 @@ bool BlockBloomFilter::operator!=(const BlockBloomFilter& rhs) const {
   return !(rhs == *this);
 }
 
+void BlockBloomFilter::OrEqualArray(size_t n, const uint8_t* __restrict__ in,
+                                    uint8_t* __restrict__ out) {
+  // The trivial loop out[i] |= in[i] should auto-vectorize with gcc at -O3, but it is not
+  // written in a way that is very friendly to auto-vectorization. Instead, we manually
+  // vectorize, increasing the speed by up to 56x.
+  const __m128i* simd_in = reinterpret_cast<const __m128i*>(in);
+  const __m128i* const simd_in_end = reinterpret_cast<const __m128i*>(in + n);
+  __m128i* simd_out = reinterpret_cast<__m128i*>(out);
+  // in.directory has a size (in bytes) that is a multiple of 32. Since sizeof(__m128i)
+  // == 16, we can do two _mm_or_si128's in each iteration without checking array
+  // bounds.
+  while (simd_in != simd_in_end) {
+    for (int i = 0; i < 2; ++i, ++simd_in, ++simd_out) {
+      _mm_storeu_si128(
+          simd_out, _mm_or_si128(_mm_loadu_si128(simd_out), _mm_loadu_si128(simd_in)));
+    }
+  }
+}
+
+Status BlockBloomFilter::Or(const BlockBloomFilter& other) {
+  // AlwaysTrueFilter is a special case implemented with a nullptr.
+  // Hence Or'ing with an AlwaysTrueFilter will result in a Bloom filter that also
+  // always returns true which'll require destructing this Bloom filter.
+  // Moreover for a reference "other" to be an AlwaysTrueFilter the reference needs
+  // to be created from a nullptr and so we get into undefined behavior territory.
+  // Comparing AlwaysTrueFilter with "&other" results in a compiler warning for
+  // comparing a non-null argument "other" with NULL [-Wnonnull-compare].
+  // For above reasons, guard against it.
+  CHECK_NE(kAlwaysTrueFilter, &other);
+
+  if (this == &other) {
+    // No op.
+    return Status::OK();
+  }
+  if (directory_size() != other.directory_size()) {
+    return Status::InvalidArgument(Substitute("Directory size don't match. this: $0, other: $1",
+        directory_size(), other.directory_size()));
+  }
+  if (other.AlwaysFalse()) {
+    // Nothing to do.
+    return Status::OK();
+  }
+
+  (*or_equal_array_func_ptr_)(directory_size(),
+                              reinterpret_cast<uint8_t*>(other.directory_),
+                              reinterpret_cast<uint8_t*>(directory_));
+  always_false_ = false;
+  return Status::OK();
+}
+
 shared_ptr<DefaultBlockBloomFilterBufferAllocator>
     DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr() {
   // Meyer's Singleton.
diff --git a/src/kudu/util/block_bloom_filter.h b/src/kudu/util/block_bloom_filter.h
index a218dec..8a0fdf8 100644
--- a/src/kudu/util/block_bloom_filter.h
+++ b/src/kudu/util/block_bloom_filter.h
@@ -148,6 +148,21 @@ class BlockBloomFilter {
   bool operator==(const BlockBloomFilter& rhs) const;
   bool operator!=(const BlockBloomFilter& rhs) const;
 
+  // Computes the logical OR of this filter with 'other' and stores the result in this
+  // filter.
+  // Notes:
+  // - The directory sizes of the Bloom filters must match.
+  // - Or'ing with kAlwaysTrueFilter is disallowed.
+  Status Or(const BlockBloomFilter& other);
+
+  // Returns whether the Bloom filter is empty and hence would return false for all lookups.
+  bool AlwaysFalse() const {
+    return always_false_;
+  }
+
+  // Representation of a filter which allows all elements to pass.
+  static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr;
+
  private:
   // always_false_ is true when the bloom filter hasn't had any elements inserted.
   bool always_false_;
@@ -190,7 +205,7 @@ class BlockBloomFilter {
   // Helper function for public Init() variants.
   Status InitInternal(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed);
 
-  // Same as Insert(), but skips the CPU check and assumes that AVX is not available.
+  // Same as Insert(), but skips the CPU check and assumes that AVX2 is not available.
   void InsertNoAvx2(uint32_t hash) noexcept;
 
   // Does the actual work of Insert(). bucket_idx is the index of the bucket to insert
@@ -199,8 +214,11 @@ class BlockBloomFilter {
 
   bool BucketFind(uint32_t bucket_idx, uint32_t hash) const noexcept;
 
+  // Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n'.
+  static void OrEqualArray(size_t n, const uint8_t* __restrict__ in, uint8_t* __restrict__ out);
+
 #ifdef USE_AVX2
-  // Same as Insert(), but skips the CPU check and assumes that AVX is available.
+  // Same as Insert(), but skips the CPU check and assumes that AVX2 is available.
   void InsertAvx2(uint32_t hash) noexcept __attribute__((__target__("avx2")));
 
   // A faster SIMD version of BucketInsert().
@@ -210,12 +228,18 @@ class BlockBloomFilter {
   // A faster SIMD version of BucketFind().
   bool BucketFindAVX2(uint32_t bucket_idx, uint32_t hash) const noexcept
       __attribute__((__target__("avx2")));
+
+  // Computes out[i] |= in[i] for the arrays 'in' and 'out' of length 'n' using AVX2
+  // instructions. 'n' must be a multiple of 32.
+  static void OrEqualArrayAVX2(size_t n, const uint8_t* __restrict__ in,
+                               uint8_t* __restrict__ out) __attribute__((target("avx2")));
 #endif
 
   // Function pointers initialized in constructor to avoid run-time cost
   // in hot-path of Find and Insert operations.
   decltype(&BlockBloomFilter::BucketInsert) bucket_insert_func_ptr_;
   decltype(&BlockBloomFilter::BucketFind) bucket_find_func_ptr_;
+  decltype(&BlockBloomFilter::OrEqualArray) or_equal_array_func_ptr_;
 
   // Returns amount of space used in log2 bytes.
   int log_space_bytes() const {
diff --git a/src/kudu/util/block_bloom_filter_avx2.cc b/src/kudu/util/block_bloom_filter_avx2.cc
index e10b6cc..93c3ff6 100644
--- a/src/kudu/util/block_bloom_filter_avx2.cc
+++ b/src/kudu/util/block_bloom_filter_avx2.cc
@@ -24,9 +24,14 @@
 
 #include "kudu/util/block_bloom_filter.h"
 
-#include <cstdint>
 #include <immintrin.h>
 
+#include <cstddef>
+#include <cstdint>
+#include <ostream>
+
+#include <glog/logging.h>
+
 #include "kudu/gutil/port.h"
 
 namespace kudu {
@@ -75,4 +80,17 @@ void BlockBloomFilter::InsertAvx2(const uint32_t hash) noexcept {
   BucketInsertAVX2(bucket_idx, hash);
 }
 
+void BlockBloomFilter::OrEqualArrayAVX2(size_t n, const uint8_t* __restrict__ in,
+                                        uint8_t* __restrict__ out) {
+  constexpr size_t kAVXRegisterBytes = sizeof(__m256d);
+  DCHECK_EQ(n % kAVXRegisterBytes, 0) << "Invalid Bloom filter directory size";
+  const uint8_t* const in_end = in + n;
+  for (; in != in_end; (in += kAVXRegisterBytes), (out += kAVXRegisterBytes)) {
+    const double* double_in = reinterpret_cast<const double*>(in);
+    double* double_out = reinterpret_cast<double*>(out);
+    _mm256_storeu_pd(double_out,
+                     _mm256_or_pd(_mm256_loadu_pd(double_out), _mm256_loadu_pd(double_in)));
+  }
+}
+
 } // namespace kudu


[kudu] 01/03: [util] Add support for 32 & 64 byte alignment to Arena allocator

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cf555cecc0d0bd86862dd47ad7c9f075192cf000
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Mar 4 10:08:09 2020 -0800

    [util] Add support for 32 & 64 byte alignment to Arena allocator
    
    On adding couple of member variables and methods to BlockBloomFilter class
    for importing Or() function, predicate-test would sometimes randomly crash
    with SIGSEGV in AVX operations.
    
    On debugging, the crash would only happen when the "directory_" structure
    in BlockBloomFilter is not 32-bytes aligned. It's surprising without the
    addition of those methods, "directory_" so far has always been 32 bytes
    aligned despite Arena allocator not supporting alignment values greater than
    16 bytes.
    
    With input from Todd, explored 3 options to fix the alignment problem:
    1) Update the HeapBufferAllocator in util/memory to align allocation to
    64 bytes. See AllocateInternal() implementation. Surprisingly the
    FLAGS_allocator_aligned_mode is OFF by default so we appear to be relying
    on the allocator to always allocate 16 byte aligned buffers. So this option
    would require turning ON the FLAGS_allocator_aligned_mode flag by default.
    2) Update the Arena allocator such that when needed extra bytes are allocated
    to allow aligning with 32/64 bytes considering the new component will always
    be 16 byte aligned. This requires updating some address/alignment logic
    with offset_ and the new component allocation.
    3) Don't touch the Arena allocator and simply add padding in the
    ArenaBlockBloomFilterBufferAllocator to minimize any risk to other parts of
    the codebase.
    
    Opted for option #2 since it broadly adds support for 32/64 byte alignment
    instead of limited scope of option #3. Option #1 is tempting but unsure about
    the unknowns that turning on the allocator_aligned_mode would bring.
    
    Although we need only support for 32 byte alignment for AVX operations,
    also added support for 64 bytes to better align with cache line size.
    
    Additionally this change:
    - Adds a simple BlockBloomFilter unit test that reproduced the alignment
    problem compared to end-to-end predicate-test which was turning out to be
    difficult to debug.
    - Fixes and enhances the arena-test with bunch of variations.
    
    Change-Id: Ib665115fa0fc262a8b76c48f52947dedb84be2a7
    Reviewed-on: http://gerrit.cloudera.org:8080/15372
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/util/block_bloom_filter-test.cc |  16 ++++
 src/kudu/util/block_bloom_filter.cc      |   7 +-
 src/kudu/util/memory/arena-test.cc       | 123 ++++++++++++++++++++++++-------
 src/kudu/util/memory/arena.cc            |  31 +++++---
 src/kudu/util/memory/arena.h             |  42 +++++++----
 5 files changed, 167 insertions(+), 52 deletions(-)

diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc
index 978b2ad..4c4d418 100644
--- a/src/kudu/util/block_bloom_filter-test.cc
+++ b/src/kudu/util/block_bloom_filter-test.cc
@@ -32,6 +32,7 @@
 
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -114,6 +115,21 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) {
   bf.Close();
 }
 
+// Simple Arena allocator based test that would sometimes trigger
+// SIGSEGV due to misalignment of the "directory_" ptr on AVX operations
+// before adding support for 32/64 byte alignment in Arena allocator.
+// It simulates the allocation pattern in wire-protocol.cc.
+TEST_F(BlockBloomFilterTest, ArenaAligned) {
+  Arena a(64);
+  auto* allocator = a.NewObject<ArenaBlockBloomFilterBufferAllocator>(&a);
+  auto* bf = a.NewObject<BlockBloomFilter>(allocator);
+  bf->Init(6, FAST_HASH, 0);
+  bool key = true;
+  Slice s(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+  bf->Insert(s);
+  ASSERT_TRUE(bf->Find(s));
+}
+
 TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) {
   BlockBloomFilter bf(allocator_);
   Status s = bf.Init(4, UNKNOWN_HASH, 0);
diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc
index 572119b..e09a7cc 100644
--- a/src/kudu/util/block_bloom_filter.cc
+++ b/src/kudu/util/block_bloom_filter.cc
@@ -306,10 +306,9 @@ ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() {
 
 Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
   DCHECK_NOTNULL(arena_);
-  // 16-bytes is the max alignment supported in arena currently whereas CACHELINE_SIZE
-  // is typically 64 bytes on modern CPUs.
-  // TODO(bankim): Needs investigation to support larger alignment values.
-  *ptr = arena_->AllocateBytesAligned(bytes, 16);
+  static_assert(CACHELINE_SIZE >= 32,
+                "For AVX operations, need buffers to be 32-bytes aligned or higher");
+  *ptr = arena_->AllocateBytesAligned(bytes, CACHELINE_SIZE);
   return *ptr == nullptr ?
       Status::RuntimeError(Substitute("Arena bad_alloc. bytes: $0", bytes)) :
       Status::OK();
diff --git a/src/kudu/util/memory/arena-test.cc b/src/kudu/util/memory/arena-test.cc
index 695e305..73052cd 100644
--- a/src/kudu/util/memory/arena-test.cc
+++ b/src/kudu/util/memory/arena-test.cc
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/memory/arena.h"
+
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -27,9 +30,11 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/stringprintf.h"
-#include "kudu/util/memory/arena.h"
-#include "kudu/util/memory/memory.h"
 #include "kudu/util/mem_tracker.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
 
 DEFINE_int32(num_threads, 16, "Number of threads to test");
 DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
@@ -42,19 +47,52 @@ using std::string;
 using std::thread;
 using std::vector;
 
+// From the "arena" allocate number of bytes required to copy the "to_write" buffer
+// and add the allocated buffer to the output "ptrs" vector.
 template<class ArenaType>
-static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
-  std::vector<void *> ptrs;
+static void AllocateBytesAndWrite(ArenaType* arena, const Slice& to_write, vector<void*>* ptrs) {
+  void *allocated_bytes = arena->AllocateBytes(to_write.size());
+  ASSERT_NE(nullptr, allocated_bytes);
+  memcpy(allocated_bytes, to_write.data(), to_write.size());
+  ptrs->push_back(allocated_bytes);
+}
+
+// From the "arena" allocate aligned bytes as specified by "alignment". Number of bytes
+// must be at least the size required to copy the "to_write" buffer.
+// Add the allocated aligned buffer to the output "ptrs" vector.
+template<class ArenaType, class RNG>
+static void AllocateAlignedBytesAndWrite(ArenaType* arena, const size_t alignment,
+    const Slice& to_write, RNG* r, vector<void*>* ptrs) {
+  // To test alignment we allocate random number of bytes within bounds
+  // but write to fixed number of bytes "to_write".
+  size_t num_bytes = FLAGS_alloc_size + r->Uniform32(128 - FLAGS_alloc_size + 1);
+  ASSERT_LE(to_write.size(), num_bytes);
+  void* allocated_bytes = arena->AllocateBytesAligned(num_bytes, alignment);
+  ASSERT_NE(nullptr, allocated_bytes);
+  ASSERT_EQ(0, reinterpret_cast<uintptr_t>(allocated_bytes) % alignment) <<
+    "failed to align on " << alignment << "b boundary: " << allocated_bytes;
+  memcpy(allocated_bytes, to_write.data(), to_write.size());
+  ptrs->push_back(allocated_bytes);
+}
+
+// Thread callback function used by bunch of test cases below.
+template<class ArenaType, class RNG, bool InvokeAligned = false>
+static void AllocateAndTestThreadFunc(ArenaType *arena, uint8_t thread_index, RNG* r) {
+  vector<void *> ptrs;
   ptrs.reserve(FLAGS_allocs_per_thread);
 
-  char buf[FLAGS_alloc_size];
+  uint8_t buf[FLAGS_alloc_size];
   memset(buf, thread_index, FLAGS_alloc_size);
+  Slice data(buf, FLAGS_alloc_size);
 
   for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
-    void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
-    CHECK(alloced);
-    memcpy(alloced, buf, FLAGS_alloc_size);
-    ptrs.push_back(alloced);
+    if (InvokeAligned) {
+      // Test alignment up to 64 bytes.
+      const size_t alignment = 1 << (i % 7);
+      AllocateAlignedBytesAndWrite(arena, alignment, data, r, &ptrs);
+    } else {
+      AllocateBytesAndWrite(arena, data, &ptrs);
+    }
   }
 
   for (void *p : ptrs) {
@@ -65,26 +103,51 @@ static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
 }
 
 // Non-templated function to forward to above -- simplifies thread creation
-static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
-  AllocateThread(arena, thread_index);
+static void AllocateAndTestTSArenaFunc(ThreadSafeArena *arena, uint8_t thread_index,
+                                       ThreadSafeRandom* r) {
+  AllocateAndTestThreadFunc(arena, thread_index, r);
 }
 
+template<typename ArenaType, typename RNG>
+static void TestArenaAlignmentHelper() {
+  RNG r(SeedRandom());
+
+  for (size_t initial_size = 16; initial_size <= 2048; initial_size <<= 1) {
+    ArenaType arena(initial_size);
+    static constexpr bool kIsMultiThreaded = std::is_same<ThreadSafeArena, ArenaType>::value;
+    if (kIsMultiThreaded) {
+      vector<thread> threads;
+      threads.reserve(FLAGS_num_threads);
+      for (auto i = 0; i < FLAGS_num_threads; i++) {
+        threads.emplace_back(
+            AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokeAligned */>, &arena, i, &r);
+      }
+      for (thread& thr : threads) {
+        thr.join();
+      }
+    } else {
+      // Invoke the helper method on the same thread avoiding separate single
+      // thread creation/join.
+      AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokedAligned */>(&arena, 0, &r);
+    }
+  }
+}
 
 TEST(TestArena, TestSingleThreaded) {
   Arena arena(128);
-  AllocateThread(&arena, 0);
+  Random r(SeedRandom());
+  AllocateAndTestThreadFunc(&arena, 0, &r);
 }
 
-
-
 TEST(TestArena, TestMultiThreaded) {
   CHECK(FLAGS_num_threads < 256);
-
+  ThreadSafeRandom r(SeedRandom());
   ThreadSafeArena arena(1024);
 
   vector<thread> threads;
-  for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
-    threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
+  threads.reserve(FLAGS_num_threads);
+  for (auto i = 0; i < FLAGS_num_threads; i++) {
+    threads.emplace_back(AllocateAndTestTSArenaFunc, &arena, i, &r);
   }
 
   for (thread& thr : threads) {
@@ -92,16 +155,24 @@ TEST(TestArena, TestMultiThreaded) {
   }
 }
 
-TEST(TestArena, TestAlignment) {
-  ThreadSafeArena arena(1024);
-  for (int i = 0; i < 1000; i++) {
-    int alignment = 1 << (1 % 5);
+TEST(TestArena, TestAlignmentThreadSafe) {
+  TestArenaAlignmentHelper<ThreadSafeArena, ThreadSafeRandom>();
+}
 
-    void *ret = arena.AllocateBytesAligned(5, alignment);
-    ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
-      "failed to align on " << alignment << "b boundary: " <<
-      ret;
-  }
+TEST(TestArena, TestAlignmentNotThreadSafe) {
+  TestArenaAlignmentHelper<Arena, Random>();
+}
+
+TEST(TestArena, TestAlignmentSmallArena) {
+  // Start with small initial size and allocate bytes more than the size of the current
+  // component to trigger fallback code path in Arena. Moreover allocate number of bytes
+  // with alignment such that "aligned_size" exceeds "next_component_size".
+  Arena arena(16);
+  constexpr size_t alignment = 32;
+  void *ret = arena.AllocateBytesAligned(33, alignment);
+  ASSERT_NE(nullptr, ret);
+  ASSERT_EQ(0, reinterpret_cast<uintptr_t>(ret) % alignment) <<
+    "failed to align on " << alignment << "b boundary: " << ret;
 }
 
 TEST(TestArena, TestObjectAlignment) {
diff --git a/src/kudu/util/memory/arena.cc b/src/kudu/util/memory/arena.cc
index b580dbc..460aba5 100644
--- a/src/kudu/util/memory/arena.cc
+++ b/src/kudu/util/memory/arena.cc
@@ -76,23 +76,36 @@ void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size
   // Really need to allocate more space.
   size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
   // But, allocate enough, even if the request is large. In this case,
-  // might violate the max_element_size bound.
-  if (next_component_size < size) {
-    next_component_size = size;
+  // might violate the "max_buffer_size_" bound.
+  // Component allocation is guaranteed to be 16-byte aligned, see NewComponent(),
+  // but we also need to support higher alignment values of 32 and 64 bytes and
+  // hence we add padding so that first request to allocate bytes after new
+  // component creation doesn't fail.
+  size_t aligned_size;
+  if (align <= 16) {
+    aligned_size = size;
+  } else {
+    DCHECK(align == 32 || align == 64);
+    aligned_size = size + align - 16;
   }
+
+  if (next_component_size < aligned_size) {
+    next_component_size = aligned_size;
+  }
+
   // If soft quota is exhausted we will only get the "minimal" amount of memory
-  // we ask for. In this case if we always use "size" as minimal, we may degrade
+  // we ask for. In this case if we always use "aligned_size" as minimal, we may degrade
   // to allocating a lot of tiny components, one for each string added to the
   // arena. This would be very inefficient, so let's first try something between
-  // "size" and "next_component_size". If it fails due to hard quota being
-  // exhausted, we'll fall back to using "size" as minimal.
-  size_t minimal = (size + next_component_size) / 2;
-  CHECK_LE(size, minimal);
+  // "aligned_size" and "next_component_size". If it fails due to hard quota being
+  // exhausted, we'll fall back to using "aligned_size" as minimal.
+  size_t minimal = (aligned_size + next_component_size) / 2;
+  CHECK_LE(aligned_size, minimal);
   CHECK_LE(minimal, next_component_size);
   // Now, just make sure we can actually get the memory.
   Component* component = NewComponent(next_component_size, minimal);
   if (component == nullptr) {
-    component = NewComponent(next_component_size, size);
+    component = NewComponent(next_component_size, aligned_size);
   }
   if (!component) return nullptr;
 
diff --git a/src/kudu/util/memory/arena.h b/src/kudu/util/memory/arena.h
index 34e1faa..6eaa477 100644
--- a/src/kudu/util/memory/arena.h
+++ b/src/kudu/util/memory/arena.h
@@ -21,13 +21,13 @@
 // Memory arena for variable-length datatypes and STL collections.
 #pragma once
 
-#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <new>
 #include <ostream>
+#include <utility>
 #include <vector>
 
 #include <boost/signals2/dummy_mutex.hpp>
@@ -160,8 +160,8 @@ class ArenaBase {
   }
 
   // Allocate bytes, ensuring a specified alignment.
-  // NOTE: alignment MUST be a power of two, or else this will break.
-  void* AllocateBytesAligned(const size_t size, const size_t alignment);
+  // NOTE: alignment MUST be a power of two and only upto 64 bytes is supported.
+  void* AllocateBytesAligned(size_t size, size_t alignment);
 
   // Removes all data from the arena. (Invalidates all pointers returned by
   // AddSlice and AllocateBytes). Does not cause memory allocation.
@@ -183,8 +183,9 @@ class ArenaBase {
   class Component;
 
   // Fallback for AllocateBytes non-fast-path
-  void* AllocateBytesFallback(const size_t size, const size_t align);
+  void* AllocateBytesFallback(size_t size, size_t align);
 
+  // Returned component is guaranteed to be 16-byte aligned.
   Component* NewComponent(size_t requested_size, size_t minimum_size);
   void AddComponent(Component *component);
 
@@ -367,6 +368,19 @@ class ArenaBase<THREADSAFE>::Component {
   }
 
  private:
+  // Adjusts the supplied "offset" such that the combined "data" ptr and "offset" aligns
+  // with "alignment" bytes.
+  //
+  // Component start address "data_" is only guaranteed to be 16-byte aligned with enough
+  // bytes for the first request size plus any padding needed for alignment.
+  // So to support alignment values greater than 16 bytes, align the destination address ptr
+  // that'll be returned by AllocatedBytesAligned() and not just the "offset_".
+  template<typename T>
+  static inline T AlignOffset(const uint8_t* data, const T offset, const size_t alignment) {
+    const auto data_start_addr = reinterpret_cast<uintptr_t>(data);
+    return KUDU_ALIGN_UP((data_start_addr + offset), alignment) - data_start_addr;
+  }
+
   // Mark the given range unpoisoned in ASAN.
   // This is a no-op in a non-ASAN build.
   void AsanUnpoison(const void* addr, size_t size);
@@ -386,21 +400,21 @@ class ArenaBase<THREADSAFE>::Component {
   DISALLOW_COPY_AND_ASSIGN(Component);
 };
 
-
 // Thread-safe implementation
 template <>
 inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
   const size_t size, const size_t alignment) {
   // Special case check the allowed alignments. Currently, we only ensure
-  // the allocated buffer components are 16-byte aligned, and the code path
-  // doesn't support larger alignment.
-  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
-         alignment == 8 || alignment == 16)
+  // the allocated buffer components are 16-byte aligned and add extra padding
+  // to support 32/64 byte alignment but the code path hasn't been tested
+  // with larger alignment values nor has there been a need.
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
+         alignment == 16 || alignment == 32 || alignment == 64)
     << "bad alignment: " << alignment;
   retry:
   Atomic32 offset = Acquire_Load(&offset_);
 
-  Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
+  Atomic32 aligned = AlignOffset(data_, offset, alignment);
   Atomic32 new_offset = aligned + size;
 
   if (PREDICT_TRUE(new_offset <= size_)) {
@@ -421,13 +435,15 @@ inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
 template <>
 inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
   const size_t size, const size_t alignment) {
-  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
-         alignment == 8 || alignment == 16)
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
+         alignment == 16 || alignment == 32 || alignment == 64)
     << "bad alignment: " << alignment;
-  size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
+
+  size_t aligned = AlignOffset(data_, offset_, alignment);
   uint8_t* destination = data_ + aligned;
   size_t save_offset = offset_;
   offset_ = aligned + size;
+
   if (PREDICT_TRUE(offset_ <= size_)) {
     AsanUnpoison(data_ + aligned, size);
     return destination;