You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/10 21:41:52 UTC

[kudu] 01/03: [util] Add support for 32 & 64 byte alignment to Arena allocator

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cf555cecc0d0bd86862dd47ad7c9f075192cf000
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Mar 4 10:08:09 2020 -0800

    [util] Add support for 32 & 64 byte alignment to Arena allocator
    
    On adding couple of member variables and methods to BlockBloomFilter class
    for importing Or() function, predicate-test would sometimes randomly crash
    with SIGSEGV in AVX operations.
    
    On debugging, the crash would only happen when the "directory_" structure
    in BlockBloomFilter is not 32-bytes aligned. It's surprising without the
    addition of those methods, "directory_" so far has always been 32 bytes
    aligned despite Arena allocator not supporting alignment values greater than
    16 bytes.
    
    With input from Todd, explored 3 options to fix the alignment problem:
    1) Update the HeapBufferAllocator in util/memory to align allocation to
    64 bytes. See AllocateInternal() implementation. Surprisingly the
    FLAGS_allocator_aligned_mode is OFF by default so we appear to be relying
    on the allocator to always allocate 16 byte aligned buffers. So this option
    would require turning ON the FLAGS_allocator_aligned_mode flag by default.
    2) Update the Arena allocator such that when needed extra bytes are allocated
    to allow aligning with 32/64 bytes considering the new component will always
    be 16 byte aligned. This requires updating some address/alignment logic
    with offset_ and the new component allocation.
    3) Don't touch the Arena allocator and simply add padding in the
    ArenaBlockBloomFilterBufferAllocator to minimize any risk to other parts of
    the codebase.
    
    Opted for option #2 since it broadly adds support for 32/64 byte alignment
    instead of limited scope of option #3. Option #1 is tempting but unsure about
    the unknowns that turning on the allocator_aligned_mode would bring.
    
    Although we need only support for 32 byte alignment for AVX operations,
    also added support for 64 bytes to better align with cache line size.
    
    Additionally this change:
    - Adds a simple BlockBloomFilter unit test that reproduced the alignment
    problem compared to end-to-end predicate-test which was turning out to be
    difficult to debug.
    - Fixes and enhances the arena-test with bunch of variations.
    
    Change-Id: Ib665115fa0fc262a8b76c48f52947dedb84be2a7
    Reviewed-on: http://gerrit.cloudera.org:8080/15372
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/util/block_bloom_filter-test.cc |  16 ++++
 src/kudu/util/block_bloom_filter.cc      |   7 +-
 src/kudu/util/memory/arena-test.cc       | 123 ++++++++++++++++++++++++-------
 src/kudu/util/memory/arena.cc            |  31 +++++---
 src/kudu/util/memory/arena.h             |  42 +++++++----
 5 files changed, 167 insertions(+), 52 deletions(-)

diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc
index 978b2ad..4c4d418 100644
--- a/src/kudu/util/block_bloom_filter-test.cc
+++ b/src/kudu/util/block_bloom_filter-test.cc
@@ -32,6 +32,7 @@
 
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/memory/arena.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -114,6 +115,21 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) {
   bf.Close();
 }
 
+// Simple Arena allocator based test that would sometimes trigger
+// SIGSEGV due to misalignment of the "directory_" ptr on AVX operations
+// before adding support for 32/64 byte alignment in Arena allocator.
+// It simulates the allocation pattern in wire-protocol.cc.
+TEST_F(BlockBloomFilterTest, ArenaAligned) {
+  Arena a(64);
+  auto* allocator = a.NewObject<ArenaBlockBloomFilterBufferAllocator>(&a);
+  auto* bf = a.NewObject<BlockBloomFilter>(allocator);
+  bf->Init(6, FAST_HASH, 0);
+  bool key = true;
+  Slice s(reinterpret_cast<const uint8_t*>(&key), sizeof(key));
+  bf->Insert(s);
+  ASSERT_TRUE(bf->Find(s));
+}
+
 TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) {
   BlockBloomFilter bf(allocator_);
   Status s = bf.Init(4, UNKNOWN_HASH, 0);
diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc
index 572119b..e09a7cc 100644
--- a/src/kudu/util/block_bloom_filter.cc
+++ b/src/kudu/util/block_bloom_filter.cc
@@ -306,10 +306,9 @@ ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() {
 
 Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) {
   DCHECK_NOTNULL(arena_);
-  // 16-bytes is the max alignment supported in arena currently whereas CACHELINE_SIZE
-  // is typically 64 bytes on modern CPUs.
-  // TODO(bankim): Needs investigation to support larger alignment values.
-  *ptr = arena_->AllocateBytesAligned(bytes, 16);
+  static_assert(CACHELINE_SIZE >= 32,
+                "For AVX operations, need buffers to be 32-bytes aligned or higher");
+  *ptr = arena_->AllocateBytesAligned(bytes, CACHELINE_SIZE);
   return *ptr == nullptr ?
       Status::RuntimeError(Substitute("Arena bad_alloc. bytes: $0", bytes)) :
       Status::OK();
diff --git a/src/kudu/util/memory/arena-test.cc b/src/kudu/util/memory/arena-test.cc
index 695e305..73052cd 100644
--- a/src/kudu/util/memory/arena-test.cc
+++ b/src/kudu/util/memory/arena-test.cc
@@ -15,11 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/memory/arena.h"
+
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include <gflags/gflags.h>
@@ -27,9 +30,11 @@
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/stringprintf.h"
-#include "kudu/util/memory/arena.h"
-#include "kudu/util/memory/memory.h"
 #include "kudu/util/mem_tracker.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/test_util.h"
 
 DEFINE_int32(num_threads, 16, "Number of threads to test");
 DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
@@ -42,19 +47,52 @@ using std::string;
 using std::thread;
 using std::vector;
 
+// From the "arena" allocate number of bytes required to copy the "to_write" buffer
+// and add the allocated buffer to the output "ptrs" vector.
 template<class ArenaType>
-static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
-  std::vector<void *> ptrs;
+static void AllocateBytesAndWrite(ArenaType* arena, const Slice& to_write, vector<void*>* ptrs) {
+  void *allocated_bytes = arena->AllocateBytes(to_write.size());
+  ASSERT_NE(nullptr, allocated_bytes);
+  memcpy(allocated_bytes, to_write.data(), to_write.size());
+  ptrs->push_back(allocated_bytes);
+}
+
+// From the "arena" allocate aligned bytes as specified by "alignment". Number of bytes
+// must be at least the size required to copy the "to_write" buffer.
+// Add the allocated aligned buffer to the output "ptrs" vector.
+template<class ArenaType, class RNG>
+static void AllocateAlignedBytesAndWrite(ArenaType* arena, const size_t alignment,
+    const Slice& to_write, RNG* r, vector<void*>* ptrs) {
+  // To test alignment we allocate random number of bytes within bounds
+  // but write to fixed number of bytes "to_write".
+  size_t num_bytes = FLAGS_alloc_size + r->Uniform32(128 - FLAGS_alloc_size + 1);
+  ASSERT_LE(to_write.size(), num_bytes);
+  void* allocated_bytes = arena->AllocateBytesAligned(num_bytes, alignment);
+  ASSERT_NE(nullptr, allocated_bytes);
+  ASSERT_EQ(0, reinterpret_cast<uintptr_t>(allocated_bytes) % alignment) <<
+    "failed to align on " << alignment << "b boundary: " << allocated_bytes;
+  memcpy(allocated_bytes, to_write.data(), to_write.size());
+  ptrs->push_back(allocated_bytes);
+}
+
+// Thread callback function used by bunch of test cases below.
+template<class ArenaType, class RNG, bool InvokeAligned = false>
+static void AllocateAndTestThreadFunc(ArenaType *arena, uint8_t thread_index, RNG* r) {
+  vector<void *> ptrs;
   ptrs.reserve(FLAGS_allocs_per_thread);
 
-  char buf[FLAGS_alloc_size];
+  uint8_t buf[FLAGS_alloc_size];
   memset(buf, thread_index, FLAGS_alloc_size);
+  Slice data(buf, FLAGS_alloc_size);
 
   for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
-    void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
-    CHECK(alloced);
-    memcpy(alloced, buf, FLAGS_alloc_size);
-    ptrs.push_back(alloced);
+    if (InvokeAligned) {
+      // Test alignment up to 64 bytes.
+      const size_t alignment = 1 << (i % 7);
+      AllocateAlignedBytesAndWrite(arena, alignment, data, r, &ptrs);
+    } else {
+      AllocateBytesAndWrite(arena, data, &ptrs);
+    }
   }
 
   for (void *p : ptrs) {
@@ -65,26 +103,51 @@ static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
 }
 
 // Non-templated function to forward to above -- simplifies thread creation
-static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
-  AllocateThread(arena, thread_index);
+static void AllocateAndTestTSArenaFunc(ThreadSafeArena *arena, uint8_t thread_index,
+                                       ThreadSafeRandom* r) {
+  AllocateAndTestThreadFunc(arena, thread_index, r);
 }
 
+template<typename ArenaType, typename RNG>
+static void TestArenaAlignmentHelper() {
+  RNG r(SeedRandom());
+
+  for (size_t initial_size = 16; initial_size <= 2048; initial_size <<= 1) {
+    ArenaType arena(initial_size);
+    static constexpr bool kIsMultiThreaded = std::is_same<ThreadSafeArena, ArenaType>::value;
+    if (kIsMultiThreaded) {
+      vector<thread> threads;
+      threads.reserve(FLAGS_num_threads);
+      for (auto i = 0; i < FLAGS_num_threads; i++) {
+        threads.emplace_back(
+            AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokeAligned */>, &arena, i, &r);
+      }
+      for (thread& thr : threads) {
+        thr.join();
+      }
+    } else {
+      // Invoke the helper method on the same thread avoiding separate single
+      // thread creation/join.
+      AllocateAndTestThreadFunc<ArenaType, RNG, true /* InvokedAligned */>(&arena, 0, &r);
+    }
+  }
+}
 
 TEST(TestArena, TestSingleThreaded) {
   Arena arena(128);
-  AllocateThread(&arena, 0);
+  Random r(SeedRandom());
+  AllocateAndTestThreadFunc(&arena, 0, &r);
 }
 
-
-
 TEST(TestArena, TestMultiThreaded) {
   CHECK(FLAGS_num_threads < 256);
-
+  ThreadSafeRandom r(SeedRandom());
   ThreadSafeArena arena(1024);
 
   vector<thread> threads;
-  for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
-    threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
+  threads.reserve(FLAGS_num_threads);
+  for (auto i = 0; i < FLAGS_num_threads; i++) {
+    threads.emplace_back(AllocateAndTestTSArenaFunc, &arena, i, &r);
   }
 
   for (thread& thr : threads) {
@@ -92,16 +155,24 @@ TEST(TestArena, TestMultiThreaded) {
   }
 }
 
-TEST(TestArena, TestAlignment) {
-  ThreadSafeArena arena(1024);
-  for (int i = 0; i < 1000; i++) {
-    int alignment = 1 << (1 % 5);
+TEST(TestArena, TestAlignmentThreadSafe) {
+  TestArenaAlignmentHelper<ThreadSafeArena, ThreadSafeRandom>();
+}
 
-    void *ret = arena.AllocateBytesAligned(5, alignment);
-    ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
-      "failed to align on " << alignment << "b boundary: " <<
-      ret;
-  }
+TEST(TestArena, TestAlignmentNotThreadSafe) {
+  TestArenaAlignmentHelper<Arena, Random>();
+}
+
+TEST(TestArena, TestAlignmentSmallArena) {
+  // Start with small initial size and allocate bytes more than the size of the current
+  // component to trigger fallback code path in Arena. Moreover allocate number of bytes
+  // with alignment such that "aligned_size" exceeds "next_component_size".
+  Arena arena(16);
+  constexpr size_t alignment = 32;
+  void *ret = arena.AllocateBytesAligned(33, alignment);
+  ASSERT_NE(nullptr, ret);
+  ASSERT_EQ(0, reinterpret_cast<uintptr_t>(ret) % alignment) <<
+    "failed to align on " << alignment << "b boundary: " << ret;
 }
 
 TEST(TestArena, TestObjectAlignment) {
diff --git a/src/kudu/util/memory/arena.cc b/src/kudu/util/memory/arena.cc
index b580dbc..460aba5 100644
--- a/src/kudu/util/memory/arena.cc
+++ b/src/kudu/util/memory/arena.cc
@@ -76,23 +76,36 @@ void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size
   // Really need to allocate more space.
   size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
   // But, allocate enough, even if the request is large. In this case,
-  // might violate the max_element_size bound.
-  if (next_component_size < size) {
-    next_component_size = size;
+  // might violate the "max_buffer_size_" bound.
+  // Component allocation is guaranteed to be 16-byte aligned, see NewComponent(),
+  // but we also need to support higher alignment values of 32 and 64 bytes and
+  // hence we add padding so that first request to allocate bytes after new
+  // component creation doesn't fail.
+  size_t aligned_size;
+  if (align <= 16) {
+    aligned_size = size;
+  } else {
+    DCHECK(align == 32 || align == 64);
+    aligned_size = size + align - 16;
   }
+
+  if (next_component_size < aligned_size) {
+    next_component_size = aligned_size;
+  }
+
   // If soft quota is exhausted we will only get the "minimal" amount of memory
-  // we ask for. In this case if we always use "size" as minimal, we may degrade
+  // we ask for. In this case if we always use "aligned_size" as minimal, we may degrade
   // to allocating a lot of tiny components, one for each string added to the
   // arena. This would be very inefficient, so let's first try something between
-  // "size" and "next_component_size". If it fails due to hard quota being
-  // exhausted, we'll fall back to using "size" as minimal.
-  size_t minimal = (size + next_component_size) / 2;
-  CHECK_LE(size, minimal);
+  // "aligned_size" and "next_component_size". If it fails due to hard quota being
+  // exhausted, we'll fall back to using "aligned_size" as minimal.
+  size_t minimal = (aligned_size + next_component_size) / 2;
+  CHECK_LE(aligned_size, minimal);
   CHECK_LE(minimal, next_component_size);
   // Now, just make sure we can actually get the memory.
   Component* component = NewComponent(next_component_size, minimal);
   if (component == nullptr) {
-    component = NewComponent(next_component_size, size);
+    component = NewComponent(next_component_size, aligned_size);
   }
   if (!component) return nullptr;
 
diff --git a/src/kudu/util/memory/arena.h b/src/kudu/util/memory/arena.h
index 34e1faa..6eaa477 100644
--- a/src/kudu/util/memory/arena.h
+++ b/src/kudu/util/memory/arena.h
@@ -21,13 +21,13 @@
 // Memory arena for variable-length datatypes and STL collections.
 #pragma once
 
-#include <algorithm>
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <new>
 #include <ostream>
+#include <utility>
 #include <vector>
 
 #include <boost/signals2/dummy_mutex.hpp>
@@ -160,8 +160,8 @@ class ArenaBase {
   }
 
   // Allocate bytes, ensuring a specified alignment.
-  // NOTE: alignment MUST be a power of two, or else this will break.
-  void* AllocateBytesAligned(const size_t size, const size_t alignment);
+  // NOTE: alignment MUST be a power of two and only upto 64 bytes is supported.
+  void* AllocateBytesAligned(size_t size, size_t alignment);
 
   // Removes all data from the arena. (Invalidates all pointers returned by
   // AddSlice and AllocateBytes). Does not cause memory allocation.
@@ -183,8 +183,9 @@ class ArenaBase {
   class Component;
 
   // Fallback for AllocateBytes non-fast-path
-  void* AllocateBytesFallback(const size_t size, const size_t align);
+  void* AllocateBytesFallback(size_t size, size_t align);
 
+  // Returned component is guaranteed to be 16-byte aligned.
   Component* NewComponent(size_t requested_size, size_t minimum_size);
   void AddComponent(Component *component);
 
@@ -367,6 +368,19 @@ class ArenaBase<THREADSAFE>::Component {
   }
 
  private:
+  // Adjusts the supplied "offset" such that the combined "data" ptr and "offset" aligns
+  // with "alignment" bytes.
+  //
+  // Component start address "data_" is only guaranteed to be 16-byte aligned with enough
+  // bytes for the first request size plus any padding needed for alignment.
+  // So to support alignment values greater than 16 bytes, align the destination address ptr
+  // that'll be returned by AllocatedBytesAligned() and not just the "offset_".
+  template<typename T>
+  static inline T AlignOffset(const uint8_t* data, const T offset, const size_t alignment) {
+    const auto data_start_addr = reinterpret_cast<uintptr_t>(data);
+    return KUDU_ALIGN_UP((data_start_addr + offset), alignment) - data_start_addr;
+  }
+
   // Mark the given range unpoisoned in ASAN.
   // This is a no-op in a non-ASAN build.
   void AsanUnpoison(const void* addr, size_t size);
@@ -386,21 +400,21 @@ class ArenaBase<THREADSAFE>::Component {
   DISALLOW_COPY_AND_ASSIGN(Component);
 };
 
-
 // Thread-safe implementation
 template <>
 inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
   const size_t size, const size_t alignment) {
   // Special case check the allowed alignments. Currently, we only ensure
-  // the allocated buffer components are 16-byte aligned, and the code path
-  // doesn't support larger alignment.
-  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
-         alignment == 8 || alignment == 16)
+  // the allocated buffer components are 16-byte aligned and add extra padding
+  // to support 32/64 byte alignment but the code path hasn't been tested
+  // with larger alignment values nor has there been a need.
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
+         alignment == 16 || alignment == 32 || alignment == 64)
     << "bad alignment: " << alignment;
   retry:
   Atomic32 offset = Acquire_Load(&offset_);
 
-  Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
+  Atomic32 aligned = AlignOffset(data_, offset, alignment);
   Atomic32 new_offset = aligned + size;
 
   if (PREDICT_TRUE(new_offset <= size_)) {
@@ -421,13 +435,15 @@ inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
 template <>
 inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
   const size_t size, const size_t alignment) {
-  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
-         alignment == 8 || alignment == 16)
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 || alignment == 8 ||
+         alignment == 16 || alignment == 32 || alignment == 64)
     << "bad alignment: " << alignment;
-  size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
+
+  size_t aligned = AlignOffset(data_, offset_, alignment);
   uint8_t* destination = data_ + aligned;
   size_t save_offset = offset_;
   offset_ = aligned + size;
+
   if (PREDICT_TRUE(offset_ <= size_)) {
     AsanUnpoison(data_ + aligned, size);
     return destination;