You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:27 UTC

[15/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.cc b/be/src/kudu/util/memcmpable_varint.cc
new file mode 100644
index 0000000..b30eff6
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.cc
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file contains code derived from sqlite4, distributed in the public domain.
+//
+// A variable length integer is an encoding of 64-bit unsigned integers
+// into between 1 and 9 bytes.  The encoding is designed so that small
+// (and common) values take much less space that larger values.  Additional
+// properties:
+//
+//    *  The length of the varint can be determined after examining just
+//       the first byte of the encoding.
+//
+//    *  Varints compare in numerical order using memcmp().
+//
+//************************************************************************
+//
+// Treat each byte of the encoding as an unsigned integer between 0 and 255.
+// Let the bytes of the encoding be called A0, A1, A2, ..., A8.
+//
+// DECODE
+//
+// If A0 is between 0 and 240 inclusive, then the result is the value of A0.
+//
+// If A0 is between 241 and 248 inclusive, then the result is
+// 240+256*(A0-241)+A1.
+//
+// If A0 is 249 then the result is 2288+256*A1+A2.
+//
+// If A0 is 250 then the result is A1..A3 as a 3-byte big-ending integer.
+//
+// If A0 is 251 then the result is A1..A4 as a 4-byte big-ending integer.
+//
+// If A0 is 252 then the result is A1..A5 as a 5-byte big-ending integer.
+//
+// If A0 is 253 then the result is A1..A6 as a 6-byte big-ending integer.
+//
+// If A0 is 254 then the result is A1..A7 as a 7-byte big-ending integer.
+//
+// If A0 is 255 then the result is A1..A8 as a 8-byte big-ending integer.
+//
+// ENCODE
+//
+// Let the input value be V.
+//
+// If V<=240 then output a single by A0 equal to V.
+//
+// If V<=2287 then output A0 as (V-240)/256 + 241 and A1 as (V-240)%256.
+//
+// If V<=67823 then output A0 as 249, A1 as (V-2288)/256, and A2
+// as (V-2288)%256.
+//
+// If V<=16777215 then output A0 as 250 and A1 through A3 as a big-endian
+// 3-byte integer.
+//
+// If V<=4294967295 then output A0 as 251 and A1..A4 as a big-ending
+// 4-byte integer.
+//
+// If V<=1099511627775 then output A0 as 252 and A1..A5 as a big-ending
+// 5-byte integer.
+//
+// If V<=281474976710655 then output A0 as 253 and A1..A6 as a big-ending
+// 6-byte integer.
+//
+// If V<=72057594037927935 then output A0 as 254 and A1..A7 as a
+// big-ending 7-byte integer.
+//
+// Otherwise then output A0 as 255 and A1..A8 as a big-ending 8-byte integer.
+//
+// SUMMARY
+//
+//    Bytes    Max Value    Digits
+//    -------  ---------    ---------
+//      1      240           2.3
+//      2      2287          3.3
+//      3      67823         4.8
+//      4      2**24-1       7.2
+//      5      2**32-1       9.6
+//      6      2**40-1      12.0
+//      7      2**48-1      14.4
+//      8      2**56-1      16.8
+//      9      2**64-1      19.2
+
+#include <cstddef>
+
+#include <glog/logging.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+////////////////////////////////////////////////////////////
+// Begin code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+// This function is borrowed from sqlite4/varint.c
+static void varintWrite32(uint8_t *z, uint32_t y) {
+  z[0] = (uint8_t)(y>>24);
+  z[1] = (uint8_t)(y>>16);
+  z[2] = (uint8_t)(y>>8);
+  z[3] = (uint8_t)(y);
+}
+
+
+// Write a varint into z[].  The buffer z[] must be at least 9 characters
+// long to accommodate the largest possible varint.  Return the number of
+// bytes of z[] used.
+//
+// This function is borrowed from sqlite4/varint.c
+static size_t sqlite4PutVarint64(uint8_t *z, uint64_t x) {
+  uint64_t w, y;
+  if (x <= 240) {
+    z[0] = (uint8_t)x;
+    return 1;
+  }
+  if (x <= 2287) {
+    y = (uint64_t)(x - 240);
+    z[0] = (uint8_t)(y/256 + 241);
+    z[1] = (uint8_t)(y%256);
+    return 2;
+  }
+  if (x <= 67823) {
+    y = (uint64_t)(x - 2288);
+    z[0] = 249;
+    z[1] = (uint8_t)(y/256);
+    z[2] = (uint8_t)(y%256);
+    return 3;
+  }
+  y = (uint64_t)x;
+  w = (uint64_t)(x>>32);
+  if (w == 0) {
+    if (y <= 16777215) {
+      z[0] = 250;
+      z[1] = (uint8_t)(y>>16);
+      z[2] = (uint8_t)(y>>8);
+      z[3] = (uint8_t)(y);
+      return 4;
+    }
+    z[0] = 251;
+    varintWrite32(z+1, y);
+    return 5;
+  }
+  if (w <= 255) {
+    z[0] = 252;
+    z[1] = (uint8_t)w;
+    varintWrite32(z+2, y);
+    return 6;
+  }
+  if (w <= 65535) {
+    z[0] = 253;
+    z[1] = (uint8_t)(w>>8);
+    z[2] = (uint8_t)w;
+    varintWrite32(z+3, y);
+    return 7;
+  }
+  if (w <= 16777215) {
+    z[0] = 254;
+    z[1] = (uint8_t)(w>>16);
+    z[2] = (uint8_t)(w>>8);
+    z[3] = (uint8_t)w;
+    varintWrite32(z+4, y);
+    return 8;
+  }
+  z[0] = 255;
+  varintWrite32(z+1, w);
+  varintWrite32(z+5, y);
+  return 9;
+}
+
+// Decode the varint in the first n bytes z[].  Write the integer value
+// into *pResult and return the number of bytes in the varint.
+//
+// If the decode fails because there are not enough bytes in z[] then
+// return 0;
+//
+// Borrowed from sqlite4 varint.c
+static int sqlite4GetVarint64(
+  const uint8_t *z,
+  int n,
+  uint64_t *p_result) {
+  unsigned int x;
+  if ( n < 1) return 0;
+  if (z[0] <= 240) {
+    *p_result = z[0];
+    return 1;
+  }
+  if (z[0] <= 248) {
+    if ( n < 2) return 0;
+    *p_result = (z[0]-241)*256 + z[1] + 240;
+    return 2;
+  }
+  if (n < z[0]-246 ) return 0;
+  if (z[0] == 249) {
+    *p_result = 2288 + 256*z[1] + z[2];
+    return 3;
+  }
+  if (z[0] == 250) {
+    *p_result = (z[1]<<16) + (z[2]<<8) + z[3];
+    return 4;
+  }
+  x = (z[1]<<24) + (z[2]<<16) + (z[3]<<8) + z[4];
+  if (z[0] == 251) {
+    *p_result = x;
+    return 5;
+  }
+  if (z[0] == 252) {
+    *p_result = (((uint64_t)x)<<8) + z[5];
+    return 6;
+  }
+  if (z[0] == 253) {
+    *p_result = (((uint64_t)x)<<16) + (z[5]<<8) + z[6];
+    return 7;
+  }
+  if (z[0] == 254) {
+    *p_result = (((uint64_t)x)<<24) + (z[5]<<16) + (z[6]<<8) + z[7];
+    return 8;
+  }
+  *p_result = (((uint64_t)x)<<32) +
+               (0xffffffff & ((z[5]<<24) + (z[6]<<16) + (z[7]<<8) + z[8]));
+  return 9;
+}
+
+////////////////////////////////////////////////////////////
+// End code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value) {
+  uint8_t buf[9];
+  int used = sqlite4PutVarint64(buf, value);
+  DCHECK_LE(used, sizeof(buf));
+  dst->append(buf, used);
+}
+
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value) {
+  size_t size = sqlite4GetVarint64(input->data(), input->size(), value);
+  input->remove_prefix(size);
+  return size > 0;
+}
+
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.h b/be/src/kudu/util/memcmpable_varint.h
new file mode 100644
index 0000000..955f89d
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This is an alternate varint format, borrowed from sqlite4, that differs from the
+// varint in util/coding.h in that its serialized form can be compared with memcmp(),
+// yielding the same result as comparing the original integers.
+//
+// The serialized form also has the property that multiple such varints can be strung
+// together to form a composite key, which itself is memcmpable.
+//
+// See memcmpable_varint.cc for further description.
+
+#ifndef KUDU_UTIL_MEMCMPABLE_VARINT_H
+#define KUDU_UTIL_MEMCMPABLE_VARINT_H
+
+#include <cstdint>
+
+namespace kudu {
+
+class Slice;
+class faststring;
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value);
+
+// Standard Get... routines parse a value from the beginning of a Slice
+// and advance the slice past the parsed value.
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value);
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena-test.cc b/be/src/kudu/util/memory/arena-test.cc
new file mode 100644
index 0000000..695e305
--- /dev/null
+++ b/be/src/kudu/util/memory/arena-test.cc
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mem_tracker.h"
+
+DEFINE_int32(num_threads, 16, "Number of threads to test");
+DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
+DEFINE_int32(alloc_size, 4, "number of bytes in each allocation");
+
+namespace kudu {
+
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::vector;
+
+template<class ArenaType>
+static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
+  std::vector<void *> ptrs;
+  ptrs.reserve(FLAGS_allocs_per_thread);
+
+  char buf[FLAGS_alloc_size];
+  memset(buf, thread_index, FLAGS_alloc_size);
+
+  for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
+    void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
+    CHECK(alloced);
+    memcpy(alloced, buf, FLAGS_alloc_size);
+    ptrs.push_back(alloced);
+  }
+
+  for (void *p : ptrs) {
+    if (memcmp(buf, p, FLAGS_alloc_size) != 0) {
+      FAIL() << StringPrintf("overwritten pointer at %p", p);
+    }
+  }
+}
+
+// Non-templated function to forward to above -- simplifies thread creation
+static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
+  AllocateThread(arena, thread_index);
+}
+
+
+TEST(TestArena, TestSingleThreaded) {
+  Arena arena(128);
+  AllocateThread(&arena, 0);
+}
+
+
+
+TEST(TestArena, TestMultiThreaded) {
+  CHECK(FLAGS_num_threads < 256);
+
+  ThreadSafeArena arena(1024);
+
+  vector<thread> threads;
+  for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
+    threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
+  }
+
+  for (thread& thr : threads) {
+    thr.join();
+  }
+}
+
+TEST(TestArena, TestAlignment) {
+  ThreadSafeArena arena(1024);
+  for (int i = 0; i < 1000; i++) {
+    int alignment = 1 << (1 % 5);
+
+    void *ret = arena.AllocateBytesAligned(5, alignment);
+    ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
+      "failed to align on " << alignment << "b boundary: " <<
+      ret;
+  }
+}
+
+TEST(TestArena, TestObjectAlignment) {
+  struct MyStruct {
+    int64_t v;
+  };
+  Arena a(256);
+  // Allocate a junk byte to ensure that the next allocation isn't "accidentally" aligned.
+  a.AllocateBytes(1);
+  void* v = a.NewObject<MyStruct>();
+  ASSERT_EQ(reinterpret_cast<uintptr_t>(v) % alignof(MyStruct), 0);
+}
+
+
+// MemTrackers update their ancestors when consuming and releasing memory to compute
+// usage totals. However, the lifetimes of parent and child trackers can be different.
+// Validate that child trackers can still correctly update their parent stats even when
+// the parents go out of scope.
+TEST(TestArena, TestMemoryTrackerParentReferences) {
+  // Set up a parent and child MemTracker.
+  const string parent_id = "parent-id";
+  const string child_id = "child-id";
+  shared_ptr<MemTracker> child_tracker;
+  {
+    shared_ptr<MemTracker> parent_tracker = MemTracker::CreateTracker(1024, parent_id);
+    child_tracker = MemTracker::CreateTracker(-1, child_id, parent_tracker);
+    // Parent falls out of scope here. Should still be owned by the child.
+  }
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), child_tracker));
+  MemoryTrackingArena arena(256, allocator);
+
+  // Try some child operations.
+  ASSERT_EQ(256, child_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, child_tracker->consumption());
+  allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(768, child_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingDontEnforce) {
+  shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker");
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker));
+  MemoryTrackingArena arena(256, allocator);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(768, mem_tracker->consumption());
+
+  // In DEBUG mode after Reset() the last component of an arena is
+  // cleared, but is then created again; in release mode, the last
+  // component is not cleared. In either case, after Reset()
+  // consumption() should equal the size of the last component which
+  // is 512 bytes.
+  arena.Reset();
+  ASSERT_EQ(512, mem_tracker->consumption());
+
+  // Allocate beyond allowed consumption. This should still go
+  // through, since enforce_limit is false.
+  allocated = arena.AllocateBytes(1024);
+  ASSERT_TRUE(allocated);
+
+  ASSERT_EQ(1536, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingEnforced) {
+  shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker");
+  shared_ptr<MemoryTrackingBufferAllocator> allocator(
+      new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker,
+                                        // enforce limit
+                                        true));
+  MemoryTrackingArena arena(256, allocator);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  void *allocated = arena.AllocateBytes(256);
+  ASSERT_TRUE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+  allocated = arena.AllocateBytes(1024);
+  ASSERT_FALSE(allocated);
+  ASSERT_EQ(256, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestSTLAllocator) {
+  Arena a(256);
+  typedef vector<int, ArenaAllocator<int, false> > ArenaVector;
+  ArenaAllocator<int, false> alloc(&a);
+  ArenaVector v(alloc);
+  for (int i = 0; i < 10000; i++) {
+    v.push_back(i);
+  }
+  for (int i = 0; i < 10000; i++) {
+    ASSERT_EQ(i, v[i]);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.cc b/be/src/kudu/util/memory/arena.cc
new file mode 100644
index 0000000..b580dbc
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.cc
@@ -0,0 +1,167 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+#include "kudu/util/memory/arena.h"
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+
+using std::min;
+using std::unique_ptr;
+
+namespace kudu {
+
+template <bool THREADSAFE>
+const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16;
+
+// The max size of our allocations is set to this magic number
+// corresponding to 127 tcmalloc pages (each being 8KB). tcmalloc
+// internally keeps a free-list of spans up to this size. Larger
+// allocations have to go through a linear search through free
+// space, which can get quite slow in a fragmented heap.
+//
+// See the definition of kMaxPages in tcmalloc/src/common.h
+// as well as https://github.com/gperftools/gperftools/issues/535
+// for a description of the performance issue.
+constexpr int kMaxTcmallocFastAllocation = 8192 * 127;
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(BufferAllocator* buffer_allocator,
+                                 size_t initial_buffer_size)
+    : buffer_allocator_(buffer_allocator),
+      max_buffer_size_(kMaxTcmallocFastAllocation),
+      arena_footprint_(0) {
+  AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0)));
+}
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(size_t initial_buffer_size)
+    : ArenaBase<THREADSAFE>(HeapBufferAllocator::Get(),
+                            initial_buffer_size) {
+}
+
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::SetMaxBufferSize(size_t size) {
+  DCHECK_LE(size, kMaxTcmallocFastAllocation);
+  max_buffer_size_ = size;
+}
+
+template <bool THREADSAFE>
+void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size_t align) {
+  std::lock_guard<mutex_type> lock(component_lock_);
+
+  // It's possible another thread raced with us and already allocated
+  // a new component, in which case we should try the "fast path" again
+  Component* cur = AcquireLoadCurrent();
+  void * result = cur->AllocateBytesAligned(size, align);
+  if (PREDICT_FALSE(result != nullptr)) return result;
+
+  // Really need to allocate more space.
+  size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
+  // But, allocate enough, even if the request is large. In this case,
+  // might violate the max_element_size bound.
+  if (next_component_size < size) {
+    next_component_size = size;
+  }
+  // If soft quota is exhausted we will only get the "minimal" amount of memory
+  // we ask for. In this case if we always use "size" as minimal, we may degrade
+  // to allocating a lot of tiny components, one for each string added to the
+  // arena. This would be very inefficient, so let's first try something between
+  // "size" and "next_component_size". If it fails due to hard quota being
+  // exhausted, we'll fall back to using "size" as minimal.
+  size_t minimal = (size + next_component_size) / 2;
+  CHECK_LE(size, minimal);
+  CHECK_LE(minimal, next_component_size);
+  // Now, just make sure we can actually get the memory.
+  Component* component = NewComponent(next_component_size, minimal);
+  if (component == nullptr) {
+    component = NewComponent(next_component_size, size);
+  }
+  if (!component) return nullptr;
+
+  // Now, must succeed. The component has at least 'size' bytes.
+  result = component->AllocateBytesAligned(size, align);
+  CHECK(result != nullptr);
+
+  // Now add it to the arena.
+  AddComponent(component);
+
+  return result;
+}
+
+template <bool THREADSAFE>
+typename ArenaBase<THREADSAFE>::Component* ArenaBase<THREADSAFE>::NewComponent(
+  size_t requested_size,
+  size_t minimum_size) {
+  Buffer* buffer = buffer_allocator_->BestEffortAllocate(requested_size,
+                                                         minimum_size);
+  if (buffer == nullptr) return nullptr;
+
+  CHECK_EQ(reinterpret_cast<uintptr_t>(buffer->data()) & (16 - 1), 0)
+    << "Components should be 16-byte aligned: " << buffer->data();
+
+  ASAN_POISON_MEMORY_REGION(buffer->data(), buffer->size());
+
+  return new Component(buffer);
+}
+
+// LOCKING: component_lock_ must be held by the current thread.
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::AddComponent(ArenaBase::Component *component) {
+  ReleaseStoreCurrent(component);
+  arena_.push_back(unique_ptr<Component>(component));
+  arena_footprint_ += component->size();
+}
+
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::Reset() {
+  std::lock_guard<mutex_type> lock(component_lock_);
+
+  if (PREDICT_FALSE(arena_.size() > 1)) {
+    unique_ptr<Component> last = std::move(arena_.back());
+    arena_.clear();
+    arena_.emplace_back(std::move(last));
+    ReleaseStoreCurrent(arena_[0].get());
+  }
+  arena_.back()->Reset();
+  arena_footprint_ = arena_.back()->size();
+
+#ifndef NDEBUG
+  // In debug mode release the last component too for (hopefully) better
+  // detection of memory-related bugs (invalid shallow copies, etc.).
+  size_t last_size = arena_.back()->size();
+  arena_.clear();
+  AddComponent(CHECK_NOTNULL(NewComponent(last_size, 0)));
+  arena_footprint_ = 0;
+#endif
+}
+
+template <bool THREADSAFE>
+size_t ArenaBase<THREADSAFE>::memory_footprint() const {
+  std::lock_guard<mutex_type> lock(component_lock_);
+  return arena_footprint_;
+}
+
+// Explicit instantiation.
+template class ArenaBase<true>;
+template class ArenaBase<false>;
+
+
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.h b/be/src/kudu/util/memory/arena.h
new file mode 100644
index 0000000..6d9843b
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.h
@@ -0,0 +1,501 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+//
+// Memory arena for variable-length datatypes and STL collections.
+
+#ifndef KUDU_UTIL_MEMORY_ARENA_H_
+#define KUDU_UTIL_MEMORY_ARENA_H_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <new>
+#include <ostream>
+#include <vector>
+
+#include <boost/signals2/dummy_mutex.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+template<bool THREADSAFE> struct ArenaTraits;
+
+template <> struct ArenaTraits<true> {
+  typedef Atomic32 offset_type;
+  typedef Mutex mutex_type;
+  typedef simple_spinlock spinlock_type;
+};
+
+template <> struct ArenaTraits<false> {
+  typedef uint32_t offset_type;
+  // For non-threadsafe, we don't need any real locking.
+  typedef boost::signals2::dummy_mutex mutex_type;
+  typedef boost::signals2::dummy_mutex spinlock_type;
+};
+
+// A helper class for storing variable-length blobs (e.g. strings). Once a blob
+// is added to the arena, its index stays fixed. No reallocation happens.
+// Instead, the arena keeps a list of buffers. When it needs to grow, it
+// allocates a new buffer. Each subsequent buffer is 2x larger, than its
+// predecessor, until the maximum specified buffer size is reached.
+// The buffers are furnished by a designated allocator.
+//
+// This class is thread-safe with the fast path lock-free.
+template <bool THREADSAFE>
+class ArenaBase {
+ public:
+  // Arenas are required to have a minimum size of at least this amount.
+  static const size_t kMinimumChunkSize;
+
+  // Creates a new arena, with a single buffer of size up-to initial_buffer_size
+  // and maximum capacity (i.e. total sizes of all buffers)
+  // possibly limited by the buffer allocator. The allocator might cap the
+  // initial allocation request arbitrarily (down to zero). As a consequence,
+  // arena construction never fails due to OOM.
+  //
+  // Calls to AllocateBytes() will then give out bytes from the working buffer
+  // until it is exhausted. Then, a subsequent working buffer will be allocated.
+  // The size of the next buffer is normally 2x the size of the previous buffer.
+  // It might be capped by the allocator, or by the max_buffer_size of the Arena,
+  // settable by SetMaxBufferSize below.
+  //
+  // The default maximum buffer size is ~1MB. See 'SetMaxBufferSize' for details
+  // on when you would want to configure this differently.
+  ArenaBase(BufferAllocator* buffer_allocator,
+            size_t initial_buffer_size);
+
+  // Creates an arena using a default (heap) allocator.
+  explicit ArenaBase(size_t initial_buffer_size);
+
+  // Set the maximum buffer size allocated for this arena.
+  // The maximum buffer size allowed is slightly less than ~1MB (8192 * 127 bytes).
+  //
+  // Consider the following pros/cons of large buffer sizes:
+  //
+  // Pros:
+  //   - Fewer heap allocations if the arena will hold a lot of data.
+  //     (hence better allocation performance out of the arena)
+  //   - Better page locality for objects allocated out of the same arena,
+  //     especially if huge pages are in use.
+  //   - Less internal fragmentation at the "end" of each buffer if the
+  //     size of allocations from the arena is close to the size of the
+  //     buffer. For example, with a 128KB max buffer size and 65KB
+  //     allocations, we will only be able to make one allocation from
+  //     each buffer and waste nearly 50% of memory.
+  // Cons:
+  //   - Larger heap allocations may be more difficult to fulfill if the
+  //     heap is fragmented.
+  //
+  // Overall, if you aren't sure, just leave it at the default.
+  //
+  // NOTE: this method is not thread-safe, even in the thread-safe variant.
+  // It is expected to call this only immediately after constructing the
+  // Arena instance, but before making any allocations.
+  void SetMaxBufferSize(size_t size);
+
+  // Adds content of the specified Slice to the arena, and returns a
+  // pointer to it. The pointer is guaranteed to remain valid during the
+  // lifetime of the arena. The Slice object itself is not copied. The
+  // size information is not stored.
+  // (Normal use case is that the caller already has an array of Slices,
+  // where it keeps these pointers together with size information).
+  // If this request would make the arena grow and the allocator denies that,
+  // returns NULL and leaves the arena unchanged.
+  uint8_t *AddSlice(const Slice& value);
+
+  // Same as above.
+  void * AddBytes(const void *data, size_t len);
+
+  // Handy wrapper for placement-new.
+  //
+  // This ensures that the returned object is properly aligned based on
+  // alignof(T).
+  template<class T, typename ... Args>
+  T *NewObject(Args&&... args);
+
+  // Relocate the given Slice into the arena, setting 'dst' and
+  // returning true if successful.
+  // It is legal for 'dst' to be a pointer to 'src'.
+  // See AddSlice above for detail on memory lifetime.
+  bool RelocateSlice(const Slice &src, Slice *dst);
+
+  // Similar to the above, but for StringPiece.
+  bool RelocateStringPiece(const StringPiece& src, StringPiece* sp);
+
+  // Reserves a blob of the specified size in the arena, and returns a pointer
+  // to it. The caller can then fill the allocated memory. The pointer is
+  // guaranteed to remain valid during the lifetime of the arena.
+  // If this request would make the arena grow and the allocator denies that,
+  // returns NULL and leaves the arena unchanged.
+  void* AllocateBytes(const size_t size) {
+    return AllocateBytesAligned(size, 1);
+  }
+
+  // Allocate bytes, ensuring a specified alignment.
+  // NOTE: alignment MUST be a power of two, or else this will break.
+  void* AllocateBytesAligned(const size_t size, const size_t alignment);
+
+  // Removes all data from the arena. (Invalidates all pointers returned by
+  // AddSlice and AllocateBytes). Does not cause memory allocation.
+  // May reduce memory footprint, as it discards all allocated buffers but
+  // the last one.
+  // Unless allocations exceed max_buffer_size, repetitive filling up and
+  // resetting normally lead to quickly settling memory footprint and ceasing
+  // buffer allocations, as the arena keeps reusing a single, large buffer.
+  void Reset();
+
+  // Returns the memory footprint of this arena, in bytes, defined as a sum of
+  // all buffer sizes. Always greater or equal to the total number of
+  // bytes allocated out of the arena.
+  size_t memory_footprint() const;
+
+ private:
+  typedef typename ArenaTraits<THREADSAFE>::mutex_type mutex_type;
+  // Encapsulates a single buffer in the arena.
+  class Component;
+
+  // Fallback for AllocateBytes non-fast-path
+  void* AllocateBytesFallback(const size_t size, const size_t align);
+
+  Component* NewComponent(size_t requested_size, size_t minimum_size);
+  void AddComponent(Component *component);
+
+  // Load the current component, with "Acquire" semantics (see atomicops.h)
+  // if the arena is meant to be thread-safe.
+  inline Component* AcquireLoadCurrent() {
+    if (THREADSAFE) {
+      return reinterpret_cast<Component*>(
+        base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&current_)));
+    } else {
+      return current_;
+    }
+  }
+
+  // Store the current component, with "Release" semantics (see atomicops.h)
+  // if the arena is meant to be thread-safe.
+  inline void ReleaseStoreCurrent(Component* c) {
+    if (THREADSAFE) {
+      base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(&current_),
+                                  reinterpret_cast<AtomicWord>(c));
+    } else {
+      current_ = c;
+    }
+  }
+
+  BufferAllocator* const buffer_allocator_;
+  std::vector<std::unique_ptr<Component> > arena_;
+
+  // The current component to allocate from.
+  // Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store.
+  Component* current_;
+  size_t max_buffer_size_;
+  size_t arena_footprint_;
+
+  // Lock covering 'slow path' allocation, when new components are
+  // allocated and added to the arena's list. Also covers any other
+  // mutation of the component data structure (eg Reset).
+  mutable mutex_type component_lock_;
+
+  DISALLOW_COPY_AND_ASSIGN(ArenaBase);
+};
+
+// STL-compliant allocator, for use with hash_maps and other structures
+// which share lifetime with an Arena. Enables memory control and improves
+// performance.
+template<class T, bool THREADSAFE> class ArenaAllocator {
+ public:
+  typedef T value_type;
+  typedef size_t size_type;
+  typedef ptrdiff_t difference_type;
+
+  typedef T* pointer;
+  typedef const T* const_pointer;
+  typedef T& reference;
+  typedef const T& const_reference;
+  pointer index(reference r) const  { return &r; }
+  const_pointer index(const_reference r) const  { return &r; }
+  size_type max_size() const  { return size_t(-1) / sizeof(T); }
+
+  explicit ArenaAllocator(ArenaBase<THREADSAFE>* arena) : arena_(arena) {
+    CHECK_NOTNULL(arena_);
+  }
+
+  ~ArenaAllocator() { }
+
+  pointer allocate(size_type n, std::allocator<void>::const_pointer /*hint*/ = 0) {
+    return reinterpret_cast<T*>(arena_->AllocateBytes(n * sizeof(T)));
+  }
+
+  void deallocate(pointer p, size_type n) {}
+
+  void construct(pointer p, const T& val) {
+    new(reinterpret_cast<void*>(p)) T(val);
+  }
+
+  void destroy(pointer p) { p->~T(); }
+
+  template<class U> struct rebind {
+    typedef ArenaAllocator<U, THREADSAFE> other;
+  };
+
+  template<class U, bool TS> ArenaAllocator(const ArenaAllocator<U, TS>& other)
+      : arena_(other.arena()) { }
+
+  template<class U, bool TS> bool operator==(const ArenaAllocator<U, TS>& other) const {
+    return arena_ == other.arena();
+  }
+
+  template<class U, bool TS> bool operator!=(const ArenaAllocator<U, TS>& other) const {
+    return arena_ != other.arena();
+  }
+
+  ArenaBase<THREADSAFE> *arena() const {
+    return arena_;
+  }
+
+ private:
+
+  ArenaBase<THREADSAFE>* arena_;
+};
+
+
+class Arena : public ArenaBase<false> {
+ public:
+  explicit Arena(size_t initial_buffer_size) :
+    ArenaBase<false>(initial_buffer_size)
+  {}
+};
+
+class ThreadSafeArena : public ArenaBase<true> {
+ public:
+  explicit ThreadSafeArena(size_t initial_buffer_size) :
+    ArenaBase<true>(initial_buffer_size)
+  {}
+};
+
+// Arena implementation that is integrated with MemTracker in order to
+// track heap-allocated space consumed by the arena.
+
+class MemoryTrackingArena : public ArenaBase<false> {
+ public:
+
+  MemoryTrackingArena(
+      size_t initial_buffer_size,
+      const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+      : ArenaBase<false>(tracking_allocator.get(), initial_buffer_size),
+        tracking_allocator_(tracking_allocator) {}
+
+  ~MemoryTrackingArena() {
+  }
+
+ private:
+
+  // This is required in order for the Arena to survive even after tablet is shut down,
+  // e.g., in the case of Scanners running scanners (see tablet_server-test.cc)
+  std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+class ThreadSafeMemoryTrackingArena : public ArenaBase<true> {
+ public:
+
+  ThreadSafeMemoryTrackingArena(
+      size_t initial_buffer_size,
+      const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+      : ArenaBase<true>(tracking_allocator.get(), initial_buffer_size),
+        tracking_allocator_(tracking_allocator) {}
+
+  ~ThreadSafeMemoryTrackingArena() {
+  }
+
+ private:
+
+  // See comment in MemoryTrackingArena above.
+  std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+// Implementation of inline and template methods
+
+template<bool THREADSAFE>
+class ArenaBase<THREADSAFE>::Component {
+ public:
+  explicit Component(Buffer* buffer)
+      : buffer_(buffer),
+        data_(static_cast<uint8_t*>(buffer->data())),
+        offset_(0),
+        size_(buffer->size()) {}
+
+  // Tries to reserve space in this component. Returns the pointer to the
+  // reserved space if successful; NULL on failure (if there's no more room).
+  uint8_t* AllocateBytes(const size_t size) {
+    return AllocateBytesAligned(size, 1);
+  }
+
+  uint8_t *AllocateBytesAligned(const size_t size, const size_t alignment);
+
+  size_t size() const { return size_; }
+  void Reset() {
+    ASAN_POISON_MEMORY_REGION(data_, size_);
+    offset_ = 0;
+  }
+
+ private:
+  // Mark the given range unpoisoned in ASAN.
+  // This is a no-op in a non-ASAN build.
+  void AsanUnpoison(const void* addr, size_t size);
+
+  gscoped_ptr<Buffer> buffer_;
+  uint8_t* const data_;
+  typename ArenaTraits<THREADSAFE>::offset_type offset_;
+  const size_t size_;
+
+#ifdef ADDRESS_SANITIZER
+  // Lock used around unpoisoning memory when ASAN is enabled.
+  // ASAN does not support concurrent unpoison calls that may overlap a particular
+  // memory word (8 bytes).
+  typedef typename ArenaTraits<THREADSAFE>::spinlock_type spinlock_type;
+  spinlock_type asan_lock_;
+#endif
+  DISALLOW_COPY_AND_ASSIGN(Component);
+};
+
+
+// Thread-safe implementation
+template <>
+inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
+  const size_t size, const size_t alignment) {
+  // Special case check the allowed alignments. Currently, we only ensure
+  // the allocated buffer components are 16-byte aligned, and the code path
+  // doesn't support larger alignment.
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+         alignment == 8 || alignment == 16)
+    << "bad alignment: " << alignment;
+  retry:
+  Atomic32 offset = Acquire_Load(&offset_);
+
+  Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
+  Atomic32 new_offset = aligned + size;
+
+  if (PREDICT_TRUE(new_offset <= size_)) {
+    bool success = Acquire_CompareAndSwap(&offset_, offset, new_offset) == offset;
+    if (PREDICT_TRUE(success)) {
+      AsanUnpoison(data_ + aligned, size);
+      return data_ + aligned;
+    } else {
+      // Raced with another allocator
+      goto retry;
+    }
+  } else {
+    return NULL;
+  }
+}
+
+// Non-Threadsafe implementation
+template <>
+inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
+  const size_t size, const size_t alignment) {
+  DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+         alignment == 8 || alignment == 16)
+    << "bad alignment: " << alignment;
+  size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
+  uint8_t* destination = data_ + aligned;
+  size_t save_offset = offset_;
+  offset_ = aligned + size;
+  if (PREDICT_TRUE(offset_ <= size_)) {
+    AsanUnpoison(data_ + aligned, size);
+    return destination;
+  } else {
+    offset_ = save_offset;
+    return NULL;
+  }
+}
+
+template <bool THREADSAFE>
+inline void ArenaBase<THREADSAFE>::Component::AsanUnpoison(const void* addr, size_t size) {
+#ifdef ADDRESS_SANITIZER
+  std::lock_guard<spinlock_type> l(asan_lock_);
+  ASAN_UNPOISON_MEMORY_REGION(addr, size);
+#endif
+}
+
+// Fast-path allocation should get inlined, and fall-back
+// to non-inline function call for allocation failure
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AllocateBytesAligned(const size_t size, const size_t align) {
+  void* result = AcquireLoadCurrent()->AllocateBytesAligned(size, align);
+  if (PREDICT_TRUE(result != NULL)) return result;
+  return AllocateBytesFallback(size, align);
+}
+
+template <bool THREADSAFE>
+inline uint8_t* ArenaBase<THREADSAFE>::AddSlice(const Slice& value) {
+  return reinterpret_cast<uint8_t *>(AddBytes(value.data(), value.size()));
+}
+
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AddBytes(const void *data, size_t len) {
+  void* destination = AllocateBytes(len);
+  if (destination == NULL) return NULL;
+  memcpy(destination, data, len);
+  return destination;
+}
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateSlice(const Slice &src, Slice *dst) {
+  void* destination = AllocateBytes(src.size());
+  if (destination == NULL) return false;
+  memcpy(destination, src.data(), src.size());
+  *dst = Slice(reinterpret_cast<uint8_t *>(destination), src.size());
+  return true;
+}
+
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateStringPiece(const StringPiece& src, StringPiece* sp) {
+  Slice slice(src.data(), src.size());
+  if (!RelocateSlice(slice, &slice)) return false;
+  *sp = StringPiece(reinterpret_cast<const char*>(slice.data()), slice.size());
+  return true;
+}
+
+template<bool THREADSAFE>
+template<class T, class ... Args>
+inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) {
+  void *mem = AllocateBytesAligned(sizeof(T), alignof(T));
+  if (mem == NULL) throw std::bad_alloc();
+  return new (mem) T(std::forward<Args>(args)...);
+}
+
+}  // namespace kudu
+
+#endif  // KUDU_UTIL_MEMORY_ARENA_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/memory.cc b/be/src/kudu/util/memory/memory.cc
new file mode 100644
index 0000000..b3964df
--- /dev/null
+++ b/be/src/kudu/util/memory/memory.cc
@@ -0,0 +1,339 @@
+// Copyright 2010 Google Inc.  All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "kudu/util/memory/memory.h"
+
+#include <mm_malloc.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <cstring>
+
+#include <gflags/gflags.h>
+
+#include "kudu/util/alignment.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/memory/overwrite.h"
+#include "kudu/util/mem_tracker.h"
+
+using std::copy;
+using std::min;
+
+// TODO(onufry) - test whether the code still tests OK if we set this to true,
+// or remove this code and add a test that Google allocator does not change it's
+// contract - 16-aligned in -c opt and %16 == 8 in debug.
+DEFINE_bool(allocator_aligned_mode, false,
+            "Use 16-byte alignment instead of 8-byte, "
+            "unless explicitly specified otherwise - to boost SIMD");
+TAG_FLAG(allocator_aligned_mode, hidden);
+
+namespace kudu {
+
+namespace {
+static char dummy_buffer[0] = {};
+}
+
+Buffer::~Buffer() {
+#if !defined(NDEBUG) && !defined(ADDRESS_SANITIZER)
+  // "unrolling" the string "BAD" makes for a much more efficient
+  // OverwriteWithPattern call in debug mode, so we can keep this
+  // useful bit of code without tests going slower!
+  //
+  // In ASAN mode, we don't bother with this, because when we free the memory, ASAN will
+  // prevent us from accessing it anyway.
+  OverwriteWithPattern(reinterpret_cast<char*>(data_), size_,
+                       "BADBADBADBADBADBADBADBADBADBADBAD"
+                       "BADBADBADBADBADBADBADBADBADBADBAD"
+                       "BADBADBADBADBADBADBADBADBADBADBAD");
+#endif
+  if (allocator_ != nullptr) allocator_->FreeInternal(this);
+}
+
+void BufferAllocator::LogAllocation(size_t requested,
+                                    size_t minimal,
+                                    Buffer* buffer) {
+  if (buffer == nullptr) {
+    LOG(WARNING) << "Memory allocation failed. "
+                 << "Number of bytes requested: " << requested
+                 << ", minimal: " << minimal;
+    return;
+  }
+  if (buffer->size() < requested) {
+    LOG(WARNING) << "Memory allocation was shorter than requested. "
+                 << "Number of bytes requested to allocate: " << requested
+                 << ", minimal: " << minimal
+                 << ", and actually allocated: " << buffer->size();
+  }
+}
+
+HeapBufferAllocator::HeapBufferAllocator()
+  : aligned_mode_(FLAGS_allocator_aligned_mode) {
+}
+
+Buffer* HeapBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  DCHECK_LE(minimal, requested);
+  void* data;
+  size_t attempted = requested;
+  while (true) {
+    data = (attempted == 0) ? &dummy_buffer[0] : Malloc(attempted);
+    if (data != nullptr) {
+      return CreateBuffer(data, attempted, originator);
+    }
+    if (attempted == minimal) return nullptr;
+    attempted = minimal + (attempted - minimal - 1) / 2;
+  }
+}
+
+bool HeapBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  DCHECK_LE(minimal, requested);
+  void* data;
+  size_t attempted = requested;
+  while (true) {
+    if (attempted == 0) {
+      if (buffer->size() > 0) free(buffer->data());
+      data = &dummy_buffer[0];
+    } else {
+      if (buffer->size() > 0) {
+        data = Realloc(buffer->data(), buffer->size(), attempted);
+      } else {
+        data = Malloc(attempted);
+      }
+    }
+    if (data != nullptr) {
+      UpdateBuffer(data, attempted, buffer);
+      return true;
+    }
+    if (attempted == minimal) return false;
+    attempted = minimal + (attempted - minimal - 1) / 2;
+  }
+}
+
+void HeapBufferAllocator::FreeInternal(Buffer* buffer) {
+  if (buffer->size() > 0) free(buffer->data());
+}
+
+void* HeapBufferAllocator::Malloc(size_t size) {
+  if (aligned_mode_) {
+    void* data;
+    if (posix_memalign(&data, 16, KUDU_ALIGN_UP(size, 16))) {
+       return nullptr;
+    }
+    return data;
+  } else {
+    return malloc(size);
+  }
+}
+
+void* HeapBufferAllocator::Realloc(void* previous_data, size_t previous_size,
+                                   size_t new_size) {
+  if (aligned_mode_) {
+    void* data = Malloc(new_size);
+    if (data) {
+// NOTE(ptab): We should use realloc here to avoid memmory coping,
+// but it doesn't work on memory allocated by posix_memalign(...).
+// realloc reallocates the memory but doesn't preserve the content.
+// TODO(ptab): reiterate after some time to check if it is fixed (tcmalloc ?)
+      memcpy(data, previous_data, min(previous_size, new_size));
+      free(previous_data);
+      return data;
+    } else {
+      return nullptr;
+    }
+  } else {
+    return realloc(previous_data, new_size);
+  }
+}
+
+Buffer* ClearingBufferAllocator::AllocateInternal(size_t requested,
+                                                  size_t minimal,
+                                                  BufferAllocator* originator) {
+  Buffer* buffer = DelegateAllocate(delegate_, requested, minimal,
+                                    originator);
+  if (buffer != nullptr) memset(buffer->data(), 0, buffer->size());
+  return buffer;
+}
+
+bool ClearingBufferAllocator::ReallocateInternal(size_t requested,
+                                                 size_t minimal,
+                                                 Buffer* buffer,
+                                                 BufferAllocator* originator) {
+  size_t offset = (buffer != nullptr ? buffer->size() : 0);
+  bool success = DelegateReallocate(delegate_, requested, minimal, buffer,
+                                    originator);
+  if (success && buffer->size() > offset) {
+    memset(static_cast<char*>(buffer->data()) + offset, 0,
+           buffer->size() - offset);
+  }
+  return success;
+}
+
+void ClearingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+}
+
+Buffer* MediatingBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  // Allow the mediator to trim the request.
+  size_t granted;
+  if (requested > 0) {
+    granted = mediator_->Allocate(requested, minimal);
+    if (granted < minimal) return nullptr;
+  } else {
+    granted = 0;
+  }
+  Buffer* buffer = DelegateAllocate(delegate_, granted, minimal, originator);
+  if (buffer == nullptr) {
+    mediator_->Free(granted);
+  } else if (buffer->size() < granted) {
+    mediator_->Free(granted - buffer->size());
+  }
+  return buffer;
+}
+
+bool MediatingBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  // Allow the mediator to trim the request. Be conservative; assume that
+  // realloc may degenerate to malloc-memcpy-free.
+  size_t granted;
+  if (requested > 0) {
+    granted = mediator_->Allocate(requested, minimal);
+    if (granted < minimal) return false;
+  } else {
+    granted = 0;
+  }
+  size_t old_size = buffer->size();
+  if (DelegateReallocate(delegate_, granted, minimal, buffer, originator)) {
+    mediator_->Free(granted - buffer->size() + old_size);
+    return true;
+  } else {
+    mediator_->Free(granted);
+    return false;
+  }
+}
+
+void MediatingBufferAllocator::FreeInternal(Buffer* buffer) {
+  mediator_->Free(buffer->size());
+  DelegateFree(delegate_, buffer);
+}
+
+Buffer* MemoryStatisticsCollectingBufferAllocator::AllocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    BufferAllocator* const originator) {
+  Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, originator);
+  if (buffer != nullptr) {
+    memory_stats_collector_->AllocatedMemoryBytes(buffer->size());
+  } else {
+    memory_stats_collector_->RefusedMemoryBytes(minimal);
+  }
+  return buffer;
+}
+
+bool MemoryStatisticsCollectingBufferAllocator::ReallocateInternal(
+    const size_t requested,
+    const size_t minimal,
+    Buffer* const buffer,
+    BufferAllocator* const originator) {
+  const size_t old_size = buffer->size();
+  bool outcome = DelegateReallocate(delegate_, requested, minimal, buffer,
+                                    originator);
+  if (buffer->size() > old_size) {
+    memory_stats_collector_->AllocatedMemoryBytes(buffer->size() - old_size);
+  } else if (buffer->size() < old_size) {
+    memory_stats_collector_->FreedMemoryBytes(old_size - buffer->size());
+  } else if (!outcome && (minimal > buffer->size())) {
+    memory_stats_collector_->RefusedMemoryBytes(minimal - buffer->size());
+  }
+  return outcome;
+}
+
+void MemoryStatisticsCollectingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+  memory_stats_collector_->FreedMemoryBytes(buffer->size());
+}
+
+size_t MemoryTrackingBufferAllocator::Available() const {
+  return enforce_limit_ ? mem_tracker_->SpareCapacity() : std::numeric_limits<int64_t>::max();
+}
+
+bool MemoryTrackingBufferAllocator::TryConsume(int64_t bytes) {
+  // Calls TryConsume first, even if enforce_limit_ is false: this
+  // will cause mem_tracker_ to try to free up more memory by GCing.
+  if (!mem_tracker_->TryConsume(bytes)) {
+    if (enforce_limit_) {
+      return false;
+    } else {
+      // If enforce_limit_ is false, allocate memory anyway.
+      mem_tracker_->Consume(bytes);
+    }
+  }
+  return true;
+}
+
+Buffer* MemoryTrackingBufferAllocator::AllocateInternal(size_t requested,
+                                                        size_t minimal,
+                                                        BufferAllocator* originator) {
+  if (TryConsume(requested)) {
+    Buffer* buffer = DelegateAllocate(delegate_, requested, requested, originator);
+    if (buffer == nullptr) {
+      mem_tracker_->Release(requested);
+    } else {
+      return buffer;
+    }
+  }
+
+  if (TryConsume(minimal)) {
+    Buffer* buffer = DelegateAllocate(delegate_, minimal, minimal, originator);
+    if (buffer == nullptr) {
+      mem_tracker_->Release(minimal);
+    }
+    return buffer;
+  }
+
+  return nullptr;
+}
+
+
+bool MemoryTrackingBufferAllocator::ReallocateInternal(size_t requested,
+                                                       size_t minimal,
+                                                       Buffer* buffer,
+                                                       BufferAllocator* originator) {
+  LOG(FATAL) << "Not implemented";
+  return false;
+}
+
+void MemoryTrackingBufferAllocator::FreeInternal(Buffer* buffer) {
+  DelegateFree(delegate_, buffer);
+  mem_tracker_->Release(buffer->size());
+}
+
+}  // namespace kudu