You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/15 23:01:14 UTC

[48/50] [abbrv] incubator-impala git commit: IMPALA-3200: Implement suballocator for splitting buffers

IMPALA-3200: Implement suballocator for splitting buffers

This is useful for situations like hash tables, where we want to
make multiple non-spillable allocations of variable size from buffer
pool memory and not incur the overhead of interacting with the global
buffer pool. The allocator subdivides buffers to service allocations
and uses a buddy allocation algorithm to merge freed allocations into
larger chunks. This helps avoid external fragmentation and is quite
effective at reusing memory given the typical doubling allocation
patterns of hash tables in partitioned aggs and joins.

Testing:
The allocator has fairly robust internal consistency checks via
assertions and unique_ptrs. Includes a unit test that exercises
various allocation patterns.

I tested porting hash tables over to allocate memory using an earlier
version of the suballocator, which worked well (was able to run a
wide range of queries successfully with good performance).

Change-Id: I8bfe0e429f67ad273f7c7d0816703a9e6c3da788
Reviewed-on: http://gerrit.cloudera.org:8080/4715
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2eef39d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2eef39d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2eef39d7

Branch: refs/heads/hadoop-next
Commit: 2eef39d7aea019e2aad7764d1cb590105f4cd5f6
Parents: 4fa9270
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Oct 7 09:23:59 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Dec 15 08:01:22 2016 +0000

----------------------------------------------------------------------
 be/src/common/names.h                          |   8 +
 be/src/exec/hash-table.h                       |   5 +
 be/src/runtime/bufferpool/CMakeLists.txt       |   2 +
 be/src/runtime/bufferpool/suballocator-test.cc | 378 ++++++++++++++++++++
 be/src/runtime/bufferpool/suballocator.cc      | 244 +++++++++++++
 be/src/runtime/bufferpool/suballocator.h       | 220 ++++++++++++
 6 files changed, 857 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/common/names.h
----------------------------------------------------------------------
diff --git a/be/src/common/names.h b/be/src/common/names.h
index dac250f..e116673 100644
--- a/be/src/common/names.h
+++ b/be/src/common/names.h
@@ -120,6 +120,14 @@ using std::shared_ptr;
 using std::unique_ptr;
 #endif
 
+#ifdef _GLIBCXX_UTILITY
+using std::move;
+#endif
+
+#ifdef _NEW
+using std::nothrow;
+#endif
+
 #ifdef BOOST_THREAD_THREAD_COMMON_HPP
 using boost::thread;
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 2ebc22f..300b9e7 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -553,6 +553,11 @@ class HashTable {
     } bucketData;
   };
 
+  static_assert(BitUtil::IsPowerOf2(sizeof(Bucket)),
+      "We assume that Hash-table bucket directories are a power-of-two sizes because "
+      "allocating only bucket directories with power-of-two byte sizes avoids internal "
+      "fragmentation in the simple buddy allocator.");
+
  public:
   class Iterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/runtime/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/CMakeLists.txt b/be/src/runtime/bufferpool/CMakeLists.txt
index 758d538..9151807 100644
--- a/be/src/runtime/bufferpool/CMakeLists.txt
+++ b/be/src/runtime/bufferpool/CMakeLists.txt
@@ -25,8 +25,10 @@ add_library(BufferPool
   buffer-allocator.cc
   buffer-pool.cc
   reservation-tracker.cc
+  suballocator.cc
 )
 add_dependencies(BufferPool thrift-deps)
 
 ADD_BE_TEST(buffer-pool-test)
 ADD_BE_TEST(reservation-tracker-test)
+ADD_BE_TEST(suballocator-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
new file mode 100644
index 0000000..e389ba4
--- /dev/null
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -0,0 +1,378 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdlib>
+#include <random>
+#include <string>
+#include <vector>
+
+#include <boost/scoped_ptr.hpp>
+
+#include "common/object-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/bufferpool/suballocator.h"
+#include "testutil/death-test-util.h"
+#include "testutil/gtest-util.h"
+#include "util/bit-util.h"
+
+#include "common/names.h"
+
+using std::lognormal_distribution;
+using std::mt19937;
+using std::shuffle;
+using std::uniform_int_distribution;
+
+namespace impala {
+
+class SuballocatorTest : public ::testing::Test {
+ public:
+  virtual void SetUp() override {
+    SeedRng();
+    profile_.reset(new RuntimeProfile(&obj_pool_, "test profile"));
+  }
+
+  virtual void TearDown() override {
+    for (unique_ptr<BufferPool::Client>& client : clients_) {
+      buffer_pool_->DeregisterClient(client.get());
+    }
+    clients_.clear();
+    buffer_pool_.reset();
+    global_reservation_.Close();
+    profile_.reset();
+    obj_pool_.Clear();
+  }
+
+  /// The minimum buffer size used in most tests. Chosen so that the buffer is split
+  /// at least several ways.
+  const static int64_t TEST_BUFFER_LEN = Suballocator::MIN_ALLOCATION_BYTES * 16;
+
+ protected:
+  /// Seed 'rng_' with a seed either for the environment or based on the current time.
+  void SeedRng() {
+    const char* seed_str = getenv("SUBALLOCATOR_TEST_SEED");
+    int64_t seed;
+    if (seed_str != nullptr) {
+      seed = atoi(seed_str);
+    } else {
+      seed = time(nullptr);
+    }
+    LOG(INFO) << "Random seed: " << seed;
+    rng_.seed(seed);
+  }
+
+  /// Initialize 'buffer_pool_' and 'global_reservation_' with a limit of 'total_mem'
+  /// bytes of buffers of minimum length 'min_buffer_len'.
+  void InitPool(int64_t min_buffer_len, int total_mem) {
+    global_reservation_.InitRootTracker(nullptr, total_mem);
+    buffer_pool_.reset(new BufferPool(min_buffer_len, total_mem));
+  }
+
+  /// Register a client with 'buffer_pool_'. The client is automatically deregistered
+  /// and freed at the end of the test.
+  void RegisterClient(ReservationTracker* reservation, BufferPool::Client** client) {
+    clients_.push_back(make_unique<BufferPool::Client>());
+    *client = clients_.back().get();
+    ASSERT_OK(
+        buffer_pool_->RegisterClient("test client", reservation, profile(), *client));
+  }
+
+  /// Assert that the memory for all of the suballocations is writable and disjoint by
+  /// writing a distinct value to each suballocation and reading it back. Only works for
+  /// suballocations at least 8 bytes in size.
+  void AssertMemoryValid(const vector<unique_ptr<Suballocation>>& allocs);
+
+  /// Free all the suballocations and clear the vector.
+  static void FreeAllocations(
+      Suballocator* allocator, vector<unique_ptr<Suballocation>>* allocs) {
+    for (auto& alloc : *allocs) allocator->Free(move(alloc));
+    allocs->clear();
+  }
+
+  static void ExpectReservationUnused(ReservationTracker& reservation) {
+    EXPECT_EQ(reservation.GetUsedReservation(), 0) << reservation.DebugString();
+  }
+
+  RuntimeProfile* profile() { return profile_.get(); }
+  BufferPool* buffer_pool() { return buffer_pool_.get(); }
+
+  /// Pool for objects with per-test lifetime. Cleared after every test.
+  ObjectPool obj_pool_;
+
+  /// The top-level global reservation. Initialized in InitPool() and closed after every
+  /// test.
+  ReservationTracker global_reservation_;
+
+  /// The buffer pool. Initialized in InitPool() and reset after every test.
+  scoped_ptr<BufferPool> buffer_pool_;
+
+  /// Clients for the buffer pool. Deregistered and freed after every test.
+  vector<unique_ptr<BufferPool::Client>> clients_;
+
+  /// Global profile - recreated for every test.
+  scoped_ptr<RuntimeProfile> profile_;
+
+  /// Per-test random number generator. Seeded before every test.
+  mt19937 rng_;
+};
+
+const int64_t SuballocatorTest::TEST_BUFFER_LEN;
+
+/// Basic test to make sure that we can make multiple suballocations of the same size
+/// while using the expected number of buffers.
+TEST_F(SuballocatorTest, SameSizeAllocations) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+  vector<unique_ptr<Suballocation>> allocs;
+
+  // Make suballocations smaller than the buffer size.
+  const int64_t ALLOC_SIZE = TEST_BUFFER_LEN / 4;
+  int64_t allocated_mem = 0;
+  while (allocated_mem < TOTAL_MEM) {
+    allocs.emplace_back();
+    ASSERT_OK(allocator.Allocate(ALLOC_SIZE, &allocs.back()));
+    ASSERT_TRUE(allocs.back() != nullptr) << ALLOC_SIZE << " " << allocated_mem << " "
+                                          << global_reservation_.DebugString();
+    allocated_mem += ALLOC_SIZE;
+  }
+
+  // Attempts to allocate more memory should fail gracefully.
+  const int64_t MAX_ALLOC_SIZE = 1L << 24;
+  for (int alloc_size = 1; alloc_size <= MAX_ALLOC_SIZE; alloc_size *= 2) {
+    unique_ptr<Suballocation> failed_alloc;
+    ASSERT_OK(allocator.Allocate(alloc_size, &failed_alloc));
+    ASSERT_TRUE(failed_alloc == nullptr) << alloc_size << " " << allocated_mem << " "
+                                         << global_reservation_.DebugString();
+  }
+  AssertMemoryValid(allocs);
+
+  // Check that reservation usage matches the amount allocated.
+  EXPECT_EQ(global_reservation_.GetUsedReservation(), allocated_mem)
+      << global_reservation_.DebugString();
+  FreeAllocations(&allocator, &allocs);
+  ExpectReservationUnused(global_reservation_);
+}
+
+/// Check behaviour of zero-length allocation.
+TEST_F(SuballocatorTest, ZeroLengthAllocation) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+  unique_ptr<Suballocation> alloc;
+
+  // Zero-length allocations are allowed and rounded up to the minimum size.
+  ASSERT_OK(allocator.Allocate(0, &alloc));
+  ASSERT_TRUE(alloc != nullptr) << global_reservation_.DebugString();
+  EXPECT_EQ(alloc->len(), Suballocator::MIN_ALLOCATION_BYTES);
+  allocator.Free(move(alloc));
+  ExpectReservationUnused(global_reservation_);
+}
+
+/// Check behaviour of out-of-range allocation.
+TEST_F(SuballocatorTest, OutOfRangeAllocations) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+  unique_ptr<Suballocation> alloc;
+
+  // Negative allocations are not allowed and cause a DCHECK.
+  IMPALA_ASSERT_DEBUG_DEATH(allocator.Allocate(-1, &alloc), "");
+
+  // Too-large allocations fail gracefully.
+  ASSERT_FALSE(allocator.Allocate(Suballocator::MAX_ALLOCATION_BYTES + 1, &alloc).ok())
+      << global_reservation_.DebugString();
+  ExpectReservationUnused(global_reservation_);
+}
+
+/// Basic test to make sure that non-power-of-two suballocations are handled as expected
+/// by rounding up.
+TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 128;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+
+  vector<int64_t> alloc_sizes;
+  // Multiply by 7 to get some unusual sizes.
+  for (int64_t alloc_size = 7; BitUtil::RoundUpToPowerOfTwo(alloc_size) <= TOTAL_MEM;
+       alloc_size *= 7) {
+    alloc_sizes.push_back(alloc_size);
+  }
+  // Test edge cases around power-of-two-sizes.
+  for (int64_t power_of_two = 2; power_of_two <= TOTAL_MEM; power_of_two *= 2) {
+    alloc_sizes.push_back(power_of_two - 1);
+    if (power_of_two != TOTAL_MEM) alloc_sizes.push_back(power_of_two + 1);
+  }
+  for (int64_t alloc_size : alloc_sizes) {
+    unique_ptr<Suballocation> alloc;
+    ASSERT_OK(allocator.Allocate(alloc_size, &alloc));
+    ASSERT_TRUE(alloc != nullptr) << alloc_size << " "
+                                  << global_reservation_.DebugString();
+
+    // Check that it was rounded up to a power-of-two.
+    EXPECT_EQ(alloc->len(), max(Suballocator::MIN_ALLOCATION_BYTES,
+                                BitUtil::RoundUpToPowerOfTwo(alloc_size)));
+    EXPECT_EQ(
+        max(TEST_BUFFER_LEN, alloc->len()), global_reservation_.GetUsedReservation())
+        << global_reservation_.DebugString();
+    memset(alloc->data(), 0, alloc->len()); // Check memory is writable.
+
+    allocator.Free(move(alloc));
+  }
+  ExpectReservationUnused(global_reservation_);
+}
+
+/// Test that simulates hash table's patterns of doubling suballocations and validates
+/// that memory does not become fragmented.
+TEST_F(SuballocatorTest, DoublingAllocations) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 100;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+
+  const int NUM_ALLOCS = 16;
+  vector<unique_ptr<Suballocation>> allocs(NUM_ALLOCS);
+
+  // Start with suballocations smaller than the page.
+  for (int64_t curr_alloc_size = TEST_BUFFER_LEN / 8;
+       curr_alloc_size * NUM_ALLOCS < TOTAL_MEM; curr_alloc_size *= 2) {
+    // Randomise the order of suballocations so that coalescing happens in different ways.
+    shuffle(allocs.begin(), allocs.end(), rng_);
+    for (unique_ptr<Suballocation>& alloc : allocs) {
+      unique_ptr<Suballocation> old_alloc = move(alloc);
+      ASSERT_OK(allocator.Allocate(curr_alloc_size, &alloc));
+      if (old_alloc != nullptr) allocator.Free(move(old_alloc));
+    }
+
+    AssertMemoryValid(allocs);
+
+    // Test that the memory isn't fragmented more than expected. In the worst case, the
+    // suballocations should require an extra page.
+    //
+    // If curr_alloc_size is at least the buffer size, there is no fragmentation because
+    // all previous suballocations are coalesced and freed, and all new suballocations
+    // are backed by a newly-allocated buffer.
+    //
+    // If curr_alloc_size is less than the buffer size, we lose at most a buffer to
+    // fragmentation because previous suballocations are incrementally freed in a way
+    // such that they can always be coalesced and reused. At least N/2 out of N of the
+    // Free() calls in an iteration result in the free memory being coalesced. This is
+    // because either the buddy is freed earlier or later, and the coalescing must happen
+    // either in the current Free() call or a later Free() call. Therefore at least
+    // N/2 - 1 out of N Allocate() calls follow a Free() call that coalesced memory
+    // and can therefore alway recycle a coalesced suballocation instead of allocating
+    // additional buffers.
+    //
+    // In the worst case we end up with two buffers with gaps: one buffer carried over
+    // from the previous iteration with a single curr_alloc_size gap (if the last Free()
+    // coalesced two buddies of curr_alloc_size / 2) and one buffer with only
+    // 'curr_alloc_size' bytes in use (if an Allocate() call couldn't recycle memory and
+    // had to allocate a new buffer).
+    EXPECT_LE(global_reservation_.GetUsedReservation(),
+        TEST_BUFFER_LEN + max(TEST_BUFFER_LEN, curr_alloc_size * NUM_ALLOCS));
+  }
+  // Check that reservation usage behaves as expected.
+  FreeAllocations(&allocator, &allocs);
+  ExpectReservationUnused(global_reservation_);
+}
+
+/// Do some randomised testing of the allocator. Simulate some interesting patterns with
+/// a mix of long and short runs of suballocations of variable size. Try to ensure that we
+/// spend some time with the allocator near its upper limit, where most suballocations
+/// will fail, and also in other parts of its range.
+TEST_F(SuballocatorTest, RandomAllocations) {
+  const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 1000;
+  InitPool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool::Client* client;
+  RegisterClient(&global_reservation_, &client);
+  Suballocator allocator(buffer_pool(), client, TEST_BUFFER_LEN);
+
+  vector<unique_ptr<Suballocation>> allocs;
+  int64_t allocated_mem = 0;
+  for (int iter = 0; iter < 1000; ++iter) {
+    // We want to make runs of suballocations and frees. Use lognormal distribution so
+    // that runs are mostly short, but there are some long runs mixed in.
+    int num_allocs = max(1, static_cast<int>(lognormal_distribution<double>(3, 1)(rng_)));
+    const bool alloc = uniform_int_distribution<int>(0, 1)(rng_);
+    if (alloc) {
+      const int64_t remaining_mem_per_alloc = (TOTAL_MEM - allocated_mem) / num_allocs;
+      // Fraction is ~0.12 on average but sometimes ranges above 1.0 so that we'll hit the
+      // max reservation and suballocations will fail.
+      double fraction_to_alloc = lognormal_distribution<double>(2, 1)(rng_) / 100.;
+      int64_t alloc_size = max(8L, BitUtil::RoundUpToPowerOfTwo(static_cast<int64_t>(
+                                       fraction_to_alloc * remaining_mem_per_alloc)));
+      for (int i = 0; i < num_allocs; ++i) {
+        unique_ptr<Suballocation> alloc;
+        ASSERT_OK(allocator.Allocate(alloc_size, &alloc));
+        if (alloc != nullptr) {
+          EXPECT_EQ(alloc->len(), max(alloc_size, Suballocator::MIN_ALLOCATION_BYTES));
+          allocated_mem += alloc->len();
+          allocs.push_back(move(alloc));
+        } else {
+          LOG(INFO) << "Failed to alloc " << alloc_size << " consumed " << allocated_mem
+                    << "/" << TOTAL_MEM;
+        }
+      }
+    } else {
+      // Free a random subset of suballocations.
+      num_allocs = min<int>(num_allocs, allocs.size());
+      shuffle(allocs.end() - num_allocs, allocs.end(), rng_);
+      for (int i = 0; i < num_allocs; ++i) {
+        allocated_mem -= allocs.back()->len();
+        allocator.Free(move(allocs.back()));
+        allocs.pop_back();
+      }
+    }
+    // Occasionally check that the suballocations are valid.
+    if (iter % 50 == 0) AssertMemoryValid(allocs);
+  }
+  // Check that memory is released when suballocations are freed.
+  FreeAllocations(&allocator, &allocs);
+  ExpectReservationUnused(global_reservation_);
+}
+
+void SuballocatorTest::AssertMemoryValid(
+    const vector<unique_ptr<Suballocation>>& allocs) {
+  for (int64_t i = 0; i < allocs.size(); ++i) {
+    const unique_ptr<Suballocation>& alloc = allocs[i];
+    ASSERT_GE(alloc->len(), 8);
+    // Memory should be 8-byte aligned.
+    ASSERT_EQ(0, reinterpret_cast<uint64_t>(alloc->data()) % 8) << alloc->data();
+    for (int64_t offset = 0; offset < alloc->len(); offset += 8) {
+      *reinterpret_cast<int64_t*>(alloc->data() + offset) = i;
+    }
+  }
+  for (int64_t i = 0; i < allocs.size(); ++i) {
+    const unique_ptr<Suballocation>& alloc = allocs[i];
+    for (int64_t offset = 0; offset < alloc->len(); offset += 8) {
+      ASSERT_EQ(*reinterpret_cast<int64_t*>(alloc->data() + offset), i)
+          << i << " " << alloc->data() << " " << offset;
+    }
+  }
+}
+}
+
+IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/runtime/bufferpool/suballocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.cc b/be/src/runtime/bufferpool/suballocator.cc
new file mode 100644
index 0000000..c41159e
--- /dev/null
+++ b/be/src/runtime/bufferpool/suballocator.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/bufferpool/suballocator.h"
+
+#include <new>
+
+#include "gutil/bits.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "util/bit-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+constexpr int Suballocator::LOG_MAX_ALLOCATION_BYTES;
+constexpr int64_t Suballocator::MAX_ALLOCATION_BYTES;
+constexpr int Suballocator::LOG_MIN_ALLOCATION_BYTES;
+constexpr int64_t Suballocator::MIN_ALLOCATION_BYTES;
+const int Suballocator::NUM_FREE_LISTS;
+
+Suballocator::Suballocator(
+    BufferPool* pool, BufferPool::Client* client, int64_t min_buffer_len)
+  : pool_(pool), client_(client), min_buffer_len_(min_buffer_len), allocated_(0) {}
+
+Suballocator::~Suballocator() {
+  // All allocations should be free and buffers deallocated.
+  DCHECK_EQ(allocated_, 0);
+  for (int i = 0; i < NUM_FREE_LISTS; ++i) {
+    DCHECK(free_lists_[i] == nullptr);
+  }
+}
+
+Status Suballocator::Allocate(int64_t bytes, unique_ptr<Suballocation>* result) {
+  DCHECK_GE(bytes, 0);
+  if (UNLIKELY(bytes > MAX_ALLOCATION_BYTES)) {
+    return Status(Substitute("Requested memory allocation of $0 bytes, larger than max "
+                             "supported of $1 bytes",
+        bytes, MAX_ALLOCATION_BYTES));
+  }
+  unique_ptr<Suballocation> free_node;
+  bytes = max(bytes, MIN_ALLOCATION_BYTES);
+  const int target_list_idx = ComputeListIndex(bytes);
+  for (int i = target_list_idx; i < NUM_FREE_LISTS; ++i) {
+    free_node = PopFreeListHead(i);
+    if (free_node != nullptr) break;
+  }
+
+  if (free_node == nullptr) {
+    // Unable to find free allocation, need to get more memory from buffer pool.
+    RETURN_IF_ERROR(AllocateBuffer(bytes, &free_node));
+    if (free_node == nullptr) {
+      *result = nullptr;
+      return Status::OK();
+    }
+  }
+
+  // Free node may be larger than required.
+  const int free_list_idx = ComputeListIndex(free_node->len_);
+  if (free_list_idx != target_list_idx) {
+    RETURN_IF_ERROR(SplitToSize(move(free_node), bytes, &free_node));
+    DCHECK(free_node != nullptr);
+  }
+
+  free_node->in_use_ = true;
+  allocated_ += free_node->len_;
+  *result = move(free_node);
+  return Status::OK();
+}
+
+int Suballocator::ComputeListIndex(int64_t bytes) const {
+  return Bits::Log2CeilingNonZero64(bytes) - LOG_MIN_ALLOCATION_BYTES;
+}
+
+Status Suballocator::AllocateBuffer(int64_t bytes, unique_ptr<Suballocation>* result) {
+  DCHECK_LE(bytes, MAX_ALLOCATION_BYTES);
+  const int64_t buffer_len = max(min_buffer_len_, BitUtil::RoundUpToPowerOfTwo(bytes));
+  if (!client_->reservation()->IncreaseReservationToFit(buffer_len)) {
+    *result = nullptr;
+    return Status::OK();
+  }
+
+  unique_ptr<Suballocation> free_node;
+  RETURN_IF_ERROR(Suballocation::Create(&free_node));
+  RETURN_IF_ERROR(pool_->AllocateBuffer(client_, buffer_len, &free_node->buffer_));
+
+  free_node->data_ = free_node->buffer_.data();
+  free_node->len_ = buffer_len;
+  *result = move(free_node);
+  return Status::OK();
+}
+
+Status Suballocator::SplitToSize(unique_ptr<Suballocation> free_node,
+    int64_t target_bytes, unique_ptr<Suballocation>* result) {
+  DCHECK(!free_node->in_use_);
+  DCHECK_GT(free_node->len_, target_bytes);
+
+  const int free_list_idx = ComputeListIndex(free_node->len_);
+  const int target_list_idx = ComputeListIndex(target_bytes);
+
+  // Preallocate nodes to avoid handling allocation failures during splitting.
+  // Need two nodes per level for the left and right children.
+  const int num_nodes = (free_list_idx - target_list_idx) * 2;
+  constexpr int MAX_NUM_NODES = NUM_FREE_LISTS * 2;
+  unique_ptr<Suballocation> nodes[MAX_NUM_NODES];
+  for (int i = 0; i < num_nodes; ++i) {
+    if (!Suballocation::Create(&nodes[i]).ok()) {
+      // Add the free node to the free list to restore the allocator to an internally
+      // consistent state.
+      AddToFreeList(move(free_node));
+      return Status("Failed to allocate list node in Suballocator");
+    }
+  }
+
+  // Iteratively split from the current size down to the target size. We will return
+  // the leftmost descendant node.
+  int next_node = 0;
+  for (int i = free_list_idx; i > target_list_idx; --i) {
+    DCHECK_EQ(free_node->len_, 1LL << (i + LOG_MIN_ALLOCATION_BYTES));
+    unique_ptr<Suballocation> left_child = move(nodes[next_node++]);
+    unique_ptr<Suballocation> right_child = move(nodes[next_node++]);
+    DCHECK_LE(next_node, num_nodes);
+
+    const int64_t child_len = free_node->len_ / 2;
+    left_child->data_ = free_node->data_;
+    right_child->data_ = free_node->data_ + child_len;
+    left_child->len_ = right_child->len_ = child_len;
+    left_child->buddy_ = right_child.get();
+    right_child->buddy_ = left_child.get();
+    free_node->in_use_ = true;
+    left_child->parent_ = move(free_node);
+
+    AddToFreeList(move(right_child));
+    free_node = move(left_child);
+  }
+  *result = move(free_node);
+  return Status::OK();
+}
+
+void Suballocator::Free(unique_ptr<Suballocation> allocation) {
+  if (allocation == nullptr) return;
+
+  DCHECK(allocation->in_use_);
+  allocation->in_use_ = false;
+  allocated_ -= allocation->len_;
+
+  // Iteratively coalesce buddies until the buddy is in use or we get to the root.
+  // This ensures that all buddies in the free lists are coalesced. I.e. we do not
+  // have two buddies in the same free list.
+  unique_ptr<Suballocation> curr_allocation = move(allocation);
+  while (curr_allocation->buddy_ != nullptr) {
+    if (curr_allocation->buddy_->in_use_) {
+      // If the buddy is not free we can't coalesce, just add it to free list.
+      AddToFreeList(move(curr_allocation));
+      return;
+    }
+    unique_ptr<Suballocation> buddy = RemoveFromFreeList(curr_allocation->buddy_);
+    curr_allocation = CoalesceBuddies(move(curr_allocation), move(buddy));
+  }
+
+  // Reached root, which is an entire free buffer. We are not using it, so free up memory.
+  DCHECK(curr_allocation->buffer_.is_open());
+  pool_->FreeBuffer(client_, &curr_allocation->buffer_);
+  curr_allocation.reset();
+}
+
+void Suballocator::AddToFreeList(unique_ptr<Suballocation> node) {
+  DCHECK(!node->in_use_);
+  int list_idx = ComputeListIndex(node->len_);
+  if (free_lists_[list_idx] != nullptr) {
+    free_lists_[list_idx]->prev_free_ = node.get();
+  }
+  node->next_free_ = move(free_lists_[list_idx]);
+  DCHECK(node->prev_free_ == nullptr);
+  free_lists_[list_idx] = move(node);
+}
+
+unique_ptr<Suballocation> Suballocator::RemoveFromFreeList(Suballocation* node) {
+  DCHECK(node != nullptr);
+  const int list_idx = ComputeListIndex(node->len_);
+
+  if (node->next_free_ != nullptr) {
+    node->next_free_->prev_free_ = node->prev_free_;
+  }
+
+  unique_ptr<Suballocation>* ptr_from_prev = node->prev_free_ == nullptr ?
+      &free_lists_[list_idx] :
+      &node->prev_free_->next_free_;
+  node->prev_free_ = nullptr;
+  unique_ptr<Suballocation> result = move(*ptr_from_prev);
+  *ptr_from_prev = move(node->next_free_);
+  return result;
+}
+
+unique_ptr<Suballocation> Suballocator::PopFreeListHead(int list_idx) {
+  if (free_lists_[list_idx] == nullptr) return nullptr;
+  unique_ptr<Suballocation> result = move(free_lists_[list_idx]);
+  DCHECK(result->prev_free_ == nullptr);
+  if (result->next_free_ != nullptr) {
+    result->next_free_->prev_free_ = nullptr;
+  }
+  free_lists_[list_idx] = move(result->next_free_);
+  return result;
+}
+
+unique_ptr<Suballocation> Suballocator::CoalesceBuddies(
+    unique_ptr<Suballocation> b1, unique_ptr<Suballocation> b2) {
+  DCHECK(!b1->in_use_);
+  DCHECK(!b2->in_use_);
+  DCHECK_EQ(b1->buddy_, b2.get());
+  DCHECK_EQ(b2->buddy_, b1.get());
+  // Only the left child's parent should be present.
+  DCHECK((b1->parent_ != nullptr) ^ (b2->parent_ != nullptr));
+  unique_ptr<Suballocation> parent =
+      b1->parent_ != nullptr ? move(b1->parent_) : move(b2->parent_);
+  parent->in_use_ = false;
+  return parent;
+}
+
+Status Suballocation::Create(unique_ptr<Suballocation>* new_suballocation) {
+  // Allocate from system allocator for simplicity. We don't expect this to be
+  // performance critical or to be used for small allocations where CPU/memory
+  // overhead of these allocations might be a consideration.
+  new_suballocation->reset(new (nothrow) Suballocation());
+  if (*new_suballocation == nullptr) {
+    return Status(TErrorCode::MEM_ALLOC_FAILED, sizeof(Suballocation));
+  }
+  return Status::OK();
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2eef39d7/be/src/runtime/bufferpool/suballocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.h b/be/src/runtime/bufferpool/suballocator.h
new file mode 100644
index 0000000..6b08a8e
--- /dev/null
+++ b/be/src/runtime/bufferpool/suballocator.h
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERPOOL_SUBALLOCATOR_H
+#define IMPALA_RUNTIME_BUFFERPOOL_SUBALLOCATOR_H
+
+#include <cstdint>
+#include <memory>
+
+#include "runtime/bufferpool/buffer-pool.h"
+
+namespace impala {
+
+class Suballocation;
+
+/// Helper class to subdivide buffers from the buffer pool. Implements a buddy
+/// allocation algorithm optimised for power-of-two allocations. At or above the
+/// 'min_buffer_len' value, each allocation is backed by a power-of-two buffer from
+/// a BufferPool. Below that threshold, each allocation is backed by a
+/// 'min_buffer_len' buffer split recursively into equal-sized buddies until the
+/// desired allocation size is reached. Every time an allocation is freed,
+/// free buddies are coalesced eagerly and whole buffers are freed eagerly.
+///
+/// The algorithms used are asymptotically efficient: O(log(max allocation size)), but
+/// the implementation's constant-factor overhead is not optimised. Thus, the allocator
+/// is best suited for relatively large allocations where the constant CPU/memory
+/// overhead per allocation is not paramount, e.g. bucket directories of hash tables.
+/// All allocations less than MIN_ALLOCATION_BYTES are rounded up to that amount.
+///
+/// Methods of Suballocator are not thread safe.
+///
+/// Implementation:
+/// ---------------
+/// The allocator uses two key data structures: a number of binary trees representing
+/// the buddy relationships between allocations and a set of free lists, one for each
+/// power-of-two size.
+///
+/// Each buffer allocated from the buffer pool has a tree of Suballocations associated
+/// with it that use the memory from that buffer. The root of the tree is the
+/// Suballocation corresponding to the entire buffer. Each node has either zero children
+/// (if it hasn't been split) or two children (if it has been split into two buddy
+/// allocations). Each non-root Suballocation has pointers to its buddy and its parent
+/// to enable coalescing the buddies into the parent when both are free.
+///
+/// Suballocations are eagerly coalesced when freed, so a Suballocation only has children
+/// if one of its descendants is allocated.
+///
+/// The free lists are doubly-linked lists of free Suballocation objects that support
+/// O(1) add and remove. The next and previous pointers are stored in the
+/// Suballocation object so no auxiliary memory is required.
+class Suballocator {
+ public:
+  /// Constructs a suballocator that allocates memory from 'pool' with 'client'.
+  /// Suballocations smaller than 'min_buffer_len' are handled by allocating a
+  /// buffer of 'min_buffer_len' and recursively splitting it.
+  Suballocator(BufferPool* pool, BufferPool::Client* client, int64_t min_buffer_len);
+
+  ~Suballocator();
+
+  /// Allocate bytes from BufferPool. The allocation is nullptr if unsuccessful because
+  /// the client's reservation was insufficient. If an unexpected error is encountered,
+  /// returns that status. The allocation size is rounded up to the next power-of-two.
+  /// The caller must always free the allocation by calling Free() (otherwise destructing
+  /// the returned 'result' will DCHECK on debug builds or otherwise misbehave on release
+  /// builds).
+  ///
+  /// Allocate() will try to increase the client's buffer pool reservation to fulfill
+  /// the requested allocation if needed.
+  ///
+  /// The memory returned is at least 8-byte aligned.
+  Status Allocate(int64_t bytes, std::unique_ptr<Suballocation>* result);
+
+  /// Free the allocation. Does nothing if allocation is nullptr (e.g. was the result of a
+  /// failed Allocate() call).
+  void Free(std::unique_ptr<Suballocation> allocation);
+
+  /// Generous upper bounds on the max allocation size and the number of different
+  /// power-of-two allocation sizes. Used to bound the number of free lists.
+  static constexpr int LOG_MAX_ALLOCATION_BYTES = 48;
+  static constexpr int64_t MAX_ALLOCATION_BYTES = 1L << LOG_MAX_ALLOCATION_BYTES;
+
+  /// Don't support allocations less than 4kb to avoid high overhead.
+  static constexpr int LOG_MIN_ALLOCATION_BYTES = 12;
+  static constexpr int64_t MIN_ALLOCATION_BYTES = 1L << LOG_MIN_ALLOCATION_BYTES;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Suballocator);
+
+  /// Compute the index for allocations of size 'bytes' in 'free_lists_'. 'bytes' is
+  /// rounded up to the next power-of-two if it is not already a power-of-two.
+  int ComputeListIndex(int64_t bytes) const;
+
+  /// Allocate a buffer of size 'bytes' < MAX_ALLOCATION_BYTES from the buffer pool and
+  /// initialize 'result' with it. If the reservation is insufficient, try to increase
+  /// the reservation to fit.
+  Status AllocateBuffer(int64_t bytes, std::unique_ptr<Suballocation>* result);
+
+  /// Split the free allocation until we get an allocation of 'target_bytes' rounded up
+  /// to a power-of-two. This allocation is returned. The other allocations resulting
+  /// from the splits are added to free lists. node->in_use must be false and 'node'
+  /// must not be in any free list. Can fail if allocating memory for data structures
+  /// fails.
+  Status SplitToSize(std::unique_ptr<Suballocation> node, int64_t target_bytes,
+      std::unique_ptr<Suballocation>* result);
+
+  // Add allocation to the free list with given index.
+  void AddToFreeList(std::unique_ptr<Suballocation> node);
+
+  // Remove allocation from its free list.
+  std::unique_ptr<Suballocation> RemoveFromFreeList(Suballocation* node);
+
+  // Get the allocation at the head of the free list at index 'list_idx'. Return nullptr
+  // if list is empty.
+  std::unique_ptr<Suballocation> PopFreeListHead(int list_idx);
+
+  /// Coalesce two free buddies, 'b1' and 'b2'. Frees 'b1' and 'b2' and marks the parent
+  /// not in use.
+  std::unique_ptr<Suballocation> CoalesceBuddies(
+      std::unique_ptr<Suballocation> b1, std::unique_ptr<Suballocation> b2);
+
+  /// The pool and corresponding client to allocate buffers from.
+  BufferPool* pool_;
+  BufferPool::Client* client_;
+
+  /// The minimum length of buffer to allocate. To serve allocations below this threshold,
+  /// a larger buffer is allocated and split into multiple allocations.
+  const int64_t min_buffer_len_;
+
+  /// Track how much memory has been returned in allocations but not freed.
+  int64_t allocated_;
+
+  /// Free lists for each supported power-of-two size. Statically allocate the maximum
+  /// possible number of lists for simplicity. Indexed by log2 of the allocation size
+  /// minus log2 of the minimum allocation size, e.g. 16k allocations are at index 2.
+  /// Each free list should only include one buddy of each pair: if both buddies are
+  /// free, they should have been coalesced.
+  ///
+  /// Each free list is implemented as a doubly-linked list.
+  static constexpr int NUM_FREE_LISTS =
+      LOG_MAX_ALLOCATION_BYTES - LOG_MIN_ALLOCATION_BYTES + 1;
+  std::unique_ptr<Suballocation> free_lists_[NUM_FREE_LISTS];
+};
+
+/// An allocation made by a Suballocator. Each allocation returned by Suballocator must
+/// be freed with Suballocator::Free().
+///
+/// Unique_ptr is used to manage ownership of these Suballocations as a guard against
+/// memory leaks. The owner of the unique_ptr is either:
+/// - client code, if the suballocation is in use
+/// - the free list array, if the suballocation is the head of a free list
+/// - the previous free list entry, if the suballocation is a subsequent free list entry
+/// - the suballocation's left child, if the suballocation is split
+class Suballocation {
+ public:
+  // Checks that the allocation is not in use (therefore not leaked).
+  ~Suballocation() { DCHECK(!in_use_); }
+
+  uint8_t* data() const { return data_; }
+  int64_t len() const { return len_; }
+
+ private:
+  friend class Suballocator;
+
+  DISALLOW_COPY_AND_ASSIGN(Suballocation);
+
+  /// Static constructor for Suballocation. Can fail if new fails to allocate memory.
+  static Status Create(std::unique_ptr<Suballocation>* new_suballocation);
+
+  // The actual constructor - Create() is used for its better error handling.
+  Suballocation()
+    : data_(nullptr), len_(-1), buddy_(nullptr), prev_free_(nullptr), in_use_(false) {}
+
+  /// The allocation's data and its length.
+  uint8_t* data_;
+  int64_t len_;
+
+  /// The buffer backing the Suballocation, if the Suballocation is backed by an entire
+  /// buffer. Otherwise uninitialized. 'buffer_' is open iff 'buddy_' is nullptr.
+  BufferPool::BufferHandle buffer_;
+
+  /// If this is a left child, the parent of this and its buddy. The parent's allocation
+  /// is the contiguous memory buffer comprised of the two allocations. We store the
+  /// parent in only the left child so that it is uniquely owned.
+  std::unique_ptr<Suballocation> parent_;
+
+  /// The buddy allocation of this allocation. The buddy's memory buffer is the same
+  /// size and adjacent in memory. Two buddy Suballocation objects have the same
+  /// lifetime: they are created in SplitToSize() and destroyed in CoalesceBuddies().
+  Suballocation* buddy_;
+
+  /// If this is in a free list, the next element in the list. nullptr if this is the last
+  /// element in the free list. This pointer owns the next element in the linked list,
+  /// which itself stores a raw back-pointer.
+  std::unique_ptr<Suballocation> next_free_;
+
+  /// If this is in a free list, the previous element in the list. nullptr if this is the
+  /// first element. If non-nullptr, this Suballocation is owned by 'prev_free_'.
+  Suballocation* prev_free_;
+
+  /// True if was returned from Allocate() and hasn't been freed yet, or if it has been
+  /// split into two child Suballocations.
+  bool in_use_;
+};
+}
+
+#endif