You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:27 UTC
[15/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.cc b/be/src/kudu/util/memcmpable_varint.cc
new file mode 100644
index 0000000..b30eff6
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.cc
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file contains code derived from sqlite4, distributed in the public domain.
+//
+// A variable length integer is an encoding of 64-bit unsigned integers
+// into between 1 and 9 bytes. The encoding is designed so that small
+// (and common) values take much less space that larger values. Additional
+// properties:
+//
+// * The length of the varint can be determined after examining just
+// the first byte of the encoding.
+//
+// * Varints compare in numerical order using memcmp().
+//
+//************************************************************************
+//
+// Treat each byte of the encoding as an unsigned integer between 0 and 255.
+// Let the bytes of the encoding be called A0, A1, A2, ..., A8.
+//
+// DECODE
+//
+// If A0 is between 0 and 240 inclusive, then the result is the value of A0.
+//
+// If A0 is between 241 and 248 inclusive, then the result is
+// 240+256*(A0-241)+A1.
+//
+// If A0 is 249 then the result is 2288+256*A1+A2.
+//
+// If A0 is 250 then the result is A1..A3 as a 3-byte big-ending integer.
+//
+// If A0 is 251 then the result is A1..A4 as a 4-byte big-ending integer.
+//
+// If A0 is 252 then the result is A1..A5 as a 5-byte big-ending integer.
+//
+// If A0 is 253 then the result is A1..A6 as a 6-byte big-ending integer.
+//
+// If A0 is 254 then the result is A1..A7 as a 7-byte big-ending integer.
+//
+// If A0 is 255 then the result is A1..A8 as a 8-byte big-ending integer.
+//
+// ENCODE
+//
+// Let the input value be V.
+//
+// If V<=240 then output a single by A0 equal to V.
+//
+// If V<=2287 then output A0 as (V-240)/256 + 241 and A1 as (V-240)%256.
+//
+// If V<=67823 then output A0 as 249, A1 as (V-2288)/256, and A2
+// as (V-2288)%256.
+//
+// If V<=16777215 then output A0 as 250 and A1 through A3 as a big-endian
+// 3-byte integer.
+//
+// If V<=4294967295 then output A0 as 251 and A1..A4 as a big-ending
+// 4-byte integer.
+//
+// If V<=1099511627775 then output A0 as 252 and A1..A5 as a big-ending
+// 5-byte integer.
+//
+// If V<=281474976710655 then output A0 as 253 and A1..A6 as a big-ending
+// 6-byte integer.
+//
+// If V<=72057594037927935 then output A0 as 254 and A1..A7 as a
+// big-ending 7-byte integer.
+//
+// Otherwise then output A0 as 255 and A1..A8 as a big-ending 8-byte integer.
+//
+// SUMMARY
+//
+// Bytes Max Value Digits
+// ------- --------- ---------
+// 1 240 2.3
+// 2 2287 3.3
+// 3 67823 4.8
+// 4 2**24-1 7.2
+// 5 2**32-1 9.6
+// 6 2**40-1 12.0
+// 7 2**48-1 14.4
+// 8 2**56-1 16.8
+// 9 2**64-1 19.2
+
+#include <cstddef>
+
+#include <glog/logging.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+////////////////////////////////////////////////////////////
+// Begin code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+// This function is borrowed from sqlite4/varint.c
+static void varintWrite32(uint8_t *z, uint32_t y) {
+ z[0] = (uint8_t)(y>>24);
+ z[1] = (uint8_t)(y>>16);
+ z[2] = (uint8_t)(y>>8);
+ z[3] = (uint8_t)(y);
+}
+
+
+// Write a varint into z[]. The buffer z[] must be at least 9 characters
+// long to accommodate the largest possible varint. Return the number of
+// bytes of z[] used.
+//
+// This function is borrowed from sqlite4/varint.c
+static size_t sqlite4PutVarint64(uint8_t *z, uint64_t x) {
+ uint64_t w, y;
+ if (x <= 240) {
+ z[0] = (uint8_t)x;
+ return 1;
+ }
+ if (x <= 2287) {
+ y = (uint64_t)(x - 240);
+ z[0] = (uint8_t)(y/256 + 241);
+ z[1] = (uint8_t)(y%256);
+ return 2;
+ }
+ if (x <= 67823) {
+ y = (uint64_t)(x - 2288);
+ z[0] = 249;
+ z[1] = (uint8_t)(y/256);
+ z[2] = (uint8_t)(y%256);
+ return 3;
+ }
+ y = (uint64_t)x;
+ w = (uint64_t)(x>>32);
+ if (w == 0) {
+ if (y <= 16777215) {
+ z[0] = 250;
+ z[1] = (uint8_t)(y>>16);
+ z[2] = (uint8_t)(y>>8);
+ z[3] = (uint8_t)(y);
+ return 4;
+ }
+ z[0] = 251;
+ varintWrite32(z+1, y);
+ return 5;
+ }
+ if (w <= 255) {
+ z[0] = 252;
+ z[1] = (uint8_t)w;
+ varintWrite32(z+2, y);
+ return 6;
+ }
+ if (w <= 65535) {
+ z[0] = 253;
+ z[1] = (uint8_t)(w>>8);
+ z[2] = (uint8_t)w;
+ varintWrite32(z+3, y);
+ return 7;
+ }
+ if (w <= 16777215) {
+ z[0] = 254;
+ z[1] = (uint8_t)(w>>16);
+ z[2] = (uint8_t)(w>>8);
+ z[3] = (uint8_t)w;
+ varintWrite32(z+4, y);
+ return 8;
+ }
+ z[0] = 255;
+ varintWrite32(z+1, w);
+ varintWrite32(z+5, y);
+ return 9;
+}
+
+// Decode the varint in the first n bytes z[]. Write the integer value
+// into *pResult and return the number of bytes in the varint.
+//
+// If the decode fails because there are not enough bytes in z[] then
+// return 0;
+//
+// Borrowed from sqlite4 varint.c
+static int sqlite4GetVarint64(
+ const uint8_t *z,
+ int n,
+ uint64_t *p_result) {
+ unsigned int x;
+ if ( n < 1) return 0;
+ if (z[0] <= 240) {
+ *p_result = z[0];
+ return 1;
+ }
+ if (z[0] <= 248) {
+ if ( n < 2) return 0;
+ *p_result = (z[0]-241)*256 + z[1] + 240;
+ return 2;
+ }
+ if (n < z[0]-246 ) return 0;
+ if (z[0] == 249) {
+ *p_result = 2288 + 256*z[1] + z[2];
+ return 3;
+ }
+ if (z[0] == 250) {
+ *p_result = (z[1]<<16) + (z[2]<<8) + z[3];
+ return 4;
+ }
+ x = (z[1]<<24) + (z[2]<<16) + (z[3]<<8) + z[4];
+ if (z[0] == 251) {
+ *p_result = x;
+ return 5;
+ }
+ if (z[0] == 252) {
+ *p_result = (((uint64_t)x)<<8) + z[5];
+ return 6;
+ }
+ if (z[0] == 253) {
+ *p_result = (((uint64_t)x)<<16) + (z[5]<<8) + z[6];
+ return 7;
+ }
+ if (z[0] == 254) {
+ *p_result = (((uint64_t)x)<<24) + (z[5]<<16) + (z[6]<<8) + z[7];
+ return 8;
+ }
+ *p_result = (((uint64_t)x)<<32) +
+ (0xffffffff & ((z[5]<<24) + (z[6]<<16) + (z[7]<<8) + z[8]));
+ return 9;
+}
+
+////////////////////////////////////////////////////////////
+// End code ripped from sqlite4
+////////////////////////////////////////////////////////////
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value) {
+ uint8_t buf[9];
+ int used = sqlite4PutVarint64(buf, value);
+ DCHECK_LE(used, sizeof(buf));
+ dst->append(buf, used);
+}
+
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value) {
+ size_t size = sqlite4GetVarint64(input->data(), input->size(), value);
+ input->remove_prefix(size);
+ return size > 0;
+}
+
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint.h b/be/src/kudu/util/memcmpable_varint.h
new file mode 100644
index 0000000..955f89d
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This is an alternate varint format, borrowed from sqlite4, that differs from the
+// varint in util/coding.h in that its serialized form can be compared with memcmp(),
+// yielding the same result as comparing the original integers.
+//
+// The serialized form also has the property that multiple such varints can be strung
+// together to form a composite key, which itself is memcmpable.
+//
+// See memcmpable_varint.cc for further description.
+
+#ifndef KUDU_UTIL_MEMCMPABLE_VARINT_H
+#define KUDU_UTIL_MEMCMPABLE_VARINT_H
+
+#include <cstdint>
+
+namespace kudu {
+
+class Slice;
+class faststring;
+
+void PutMemcmpableVarint64(faststring *dst, uint64_t value);
+
+// Standard Get... routines parse a value from the beginning of a Slice
+// and advance the slice past the parsed value.
+bool GetMemcmpableVarint64(Slice *input, uint64_t *value);
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena-test.cc b/be/src/kudu/util/memory/arena-test.cc
new file mode 100644
index 0000000..695e305
--- /dev/null
+++ b/be/src/kudu/util/memory/arena-test.cc
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/util/memory/arena.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mem_tracker.h"
+
+DEFINE_int32(num_threads, 16, "Number of threads to test");
+DEFINE_int32(allocs_per_thread, 10000, "Number of allocations each thread should do");
+DEFINE_int32(alloc_size, 4, "number of bytes in each allocation");
+
+namespace kudu {
+
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::vector;
+
+template<class ArenaType>
+static void AllocateThread(ArenaType *arena, uint8_t thread_index) {
+ std::vector<void *> ptrs;
+ ptrs.reserve(FLAGS_allocs_per_thread);
+
+ char buf[FLAGS_alloc_size];
+ memset(buf, thread_index, FLAGS_alloc_size);
+
+ for (int i = 0; i < FLAGS_allocs_per_thread; i++) {
+ void *alloced = arena->AllocateBytes(FLAGS_alloc_size);
+ CHECK(alloced);
+ memcpy(alloced, buf, FLAGS_alloc_size);
+ ptrs.push_back(alloced);
+ }
+
+ for (void *p : ptrs) {
+ if (memcmp(buf, p, FLAGS_alloc_size) != 0) {
+ FAIL() << StringPrintf("overwritten pointer at %p", p);
+ }
+ }
+}
+
+// Non-templated function to forward to above -- simplifies thread creation
+static void AllocateThreadTSArena(ThreadSafeArena *arena, uint8_t thread_index) {
+ AllocateThread(arena, thread_index);
+}
+
+
+TEST(TestArena, TestSingleThreaded) {
+ Arena arena(128);
+ AllocateThread(&arena, 0);
+}
+
+
+
+TEST(TestArena, TestMultiThreaded) {
+ CHECK(FLAGS_num_threads < 256);
+
+ ThreadSafeArena arena(1024);
+
+ vector<thread> threads;
+ for (uint8_t i = 0; i < FLAGS_num_threads; i++) {
+ threads.emplace_back(AllocateThreadTSArena, &arena, (uint8_t)i);
+ }
+
+ for (thread& thr : threads) {
+ thr.join();
+ }
+}
+
+TEST(TestArena, TestAlignment) {
+ ThreadSafeArena arena(1024);
+ for (int i = 0; i < 1000; i++) {
+ int alignment = 1 << (1 % 5);
+
+ void *ret = arena.AllocateBytesAligned(5, alignment);
+ ASSERT_EQ(0, (uintptr_t)(ret) % alignment) <<
+ "failed to align on " << alignment << "b boundary: " <<
+ ret;
+ }
+}
+
+TEST(TestArena, TestObjectAlignment) {
+ struct MyStruct {
+ int64_t v;
+ };
+ Arena a(256);
+ // Allocate a junk byte to ensure that the next allocation isn't "accidentally" aligned.
+ a.AllocateBytes(1);
+ void* v = a.NewObject<MyStruct>();
+ ASSERT_EQ(reinterpret_cast<uintptr_t>(v) % alignof(MyStruct), 0);
+}
+
+
+// MemTrackers update their ancestors when consuming and releasing memory to compute
+// usage totals. However, the lifetimes of parent and child trackers can be different.
+// Validate that child trackers can still correctly update their parent stats even when
+// the parents go out of scope.
+TEST(TestArena, TestMemoryTrackerParentReferences) {
+ // Set up a parent and child MemTracker.
+ const string parent_id = "parent-id";
+ const string child_id = "child-id";
+ shared_ptr<MemTracker> child_tracker;
+ {
+ shared_ptr<MemTracker> parent_tracker = MemTracker::CreateTracker(1024, parent_id);
+ child_tracker = MemTracker::CreateTracker(-1, child_id, parent_tracker);
+ // Parent falls out of scope here. Should still be owned by the child.
+ }
+ shared_ptr<MemoryTrackingBufferAllocator> allocator(
+ new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), child_tracker));
+ MemoryTrackingArena arena(256, allocator);
+
+ // Try some child operations.
+ ASSERT_EQ(256, child_tracker->consumption());
+ void *allocated = arena.AllocateBytes(256);
+ ASSERT_TRUE(allocated);
+ ASSERT_EQ(256, child_tracker->consumption());
+ allocated = arena.AllocateBytes(256);
+ ASSERT_TRUE(allocated);
+ ASSERT_EQ(768, child_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingDontEnforce) {
+ shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker");
+ shared_ptr<MemoryTrackingBufferAllocator> allocator(
+ new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker));
+ MemoryTrackingArena arena(256, allocator);
+ ASSERT_EQ(256, mem_tracker->consumption());
+ void *allocated = arena.AllocateBytes(256);
+ ASSERT_TRUE(allocated);
+ ASSERT_EQ(256, mem_tracker->consumption());
+ allocated = arena.AllocateBytes(256);
+ ASSERT_TRUE(allocated);
+ ASSERT_EQ(768, mem_tracker->consumption());
+
+ // In DEBUG mode after Reset() the last component of an arena is
+ // cleared, but is then created again; in release mode, the last
+ // component is not cleared. In either case, after Reset()
+ // consumption() should equal the size of the last component which
+ // is 512 bytes.
+ arena.Reset();
+ ASSERT_EQ(512, mem_tracker->consumption());
+
+ // Allocate beyond allowed consumption. This should still go
+ // through, since enforce_limit is false.
+ allocated = arena.AllocateBytes(1024);
+ ASSERT_TRUE(allocated);
+
+ ASSERT_EQ(1536, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestMemoryTrackingEnforced) {
+ shared_ptr<MemTracker> mem_tracker = MemTracker::CreateTracker(1024, "arena-test-tracker");
+ shared_ptr<MemoryTrackingBufferAllocator> allocator(
+ new MemoryTrackingBufferAllocator(HeapBufferAllocator::Get(), mem_tracker,
+ // enforce limit
+ true));
+ MemoryTrackingArena arena(256, allocator);
+ ASSERT_EQ(256, mem_tracker->consumption());
+ void *allocated = arena.AllocateBytes(256);
+ ASSERT_TRUE(allocated);
+ ASSERT_EQ(256, mem_tracker->consumption());
+ allocated = arena.AllocateBytes(1024);
+ ASSERT_FALSE(allocated);
+ ASSERT_EQ(256, mem_tracker->consumption());
+}
+
+TEST(TestArena, TestSTLAllocator) {
+ Arena a(256);
+ typedef vector<int, ArenaAllocator<int, false> > ArenaVector;
+ ArenaAllocator<int, false> alloc(&a);
+ ArenaVector v(alloc);
+ for (int i = 0; i < 10000; i++) {
+ v.push_back(i);
+ }
+ for (int i = 0; i < 10000; i++) {
+ ASSERT_EQ(i, v[i]);
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.cc b/be/src/kudu/util/memory/arena.cc
new file mode 100644
index 0000000..b580dbc
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.cc
@@ -0,0 +1,167 @@
+// Copyright 2010 Google Inc. All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+#include "kudu/util/memory/arena.h"
+
+#include <algorithm>
+#include <memory>
+#include <mutex>
+
+using std::min;
+using std::unique_ptr;
+
+namespace kudu {
+
+template <bool THREADSAFE>
+const size_t ArenaBase<THREADSAFE>::kMinimumChunkSize = 16;
+
+// The max size of our allocations is set to this magic number
+// corresponding to 127 tcmalloc pages (each being 8KB). tcmalloc
+// internally keeps a free-list of spans up to this size. Larger
+// allocations have to go through a linear search through free
+// space, which can get quite slow in a fragmented heap.
+//
+// See the definition of kMaxPages in tcmalloc/src/common.h
+// as well as https://github.com/gperftools/gperftools/issues/535
+// for a description of the performance issue.
+constexpr int kMaxTcmallocFastAllocation = 8192 * 127;
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(BufferAllocator* buffer_allocator,
+ size_t initial_buffer_size)
+ : buffer_allocator_(buffer_allocator),
+ max_buffer_size_(kMaxTcmallocFastAllocation),
+ arena_footprint_(0) {
+ AddComponent(CHECK_NOTNULL(NewComponent(initial_buffer_size, 0)));
+}
+
+template <bool THREADSAFE>
+ArenaBase<THREADSAFE>::ArenaBase(size_t initial_buffer_size)
+ : ArenaBase<THREADSAFE>(HeapBufferAllocator::Get(),
+ initial_buffer_size) {
+}
+
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::SetMaxBufferSize(size_t size) {
+ DCHECK_LE(size, kMaxTcmallocFastAllocation);
+ max_buffer_size_ = size;
+}
+
+template <bool THREADSAFE>
+void *ArenaBase<THREADSAFE>::AllocateBytesFallback(const size_t size, const size_t align) {
+ std::lock_guard<mutex_type> lock(component_lock_);
+
+ // It's possible another thread raced with us and already allocated
+ // a new component, in which case we should try the "fast path" again
+ Component* cur = AcquireLoadCurrent();
+ void * result = cur->AllocateBytesAligned(size, align);
+ if (PREDICT_FALSE(result != nullptr)) return result;
+
+ // Really need to allocate more space.
+ size_t next_component_size = min(2 * cur->size(), max_buffer_size_);
+ // But, allocate enough, even if the request is large. In this case,
+ // might violate the max_element_size bound.
+ if (next_component_size < size) {
+ next_component_size = size;
+ }
+ // If soft quota is exhausted we will only get the "minimal" amount of memory
+ // we ask for. In this case if we always use "size" as minimal, we may degrade
+ // to allocating a lot of tiny components, one for each string added to the
+ // arena. This would be very inefficient, so let's first try something between
+ // "size" and "next_component_size". If it fails due to hard quota being
+ // exhausted, we'll fall back to using "size" as minimal.
+ size_t minimal = (size + next_component_size) / 2;
+ CHECK_LE(size, minimal);
+ CHECK_LE(minimal, next_component_size);
+ // Now, just make sure we can actually get the memory.
+ Component* component = NewComponent(next_component_size, minimal);
+ if (component == nullptr) {
+ component = NewComponent(next_component_size, size);
+ }
+ if (!component) return nullptr;
+
+ // Now, must succeed. The component has at least 'size' bytes.
+ result = component->AllocateBytesAligned(size, align);
+ CHECK(result != nullptr);
+
+ // Now add it to the arena.
+ AddComponent(component);
+
+ return result;
+}
+
+template <bool THREADSAFE>
+typename ArenaBase<THREADSAFE>::Component* ArenaBase<THREADSAFE>::NewComponent(
+ size_t requested_size,
+ size_t minimum_size) {
+ Buffer* buffer = buffer_allocator_->BestEffortAllocate(requested_size,
+ minimum_size);
+ if (buffer == nullptr) return nullptr;
+
+ CHECK_EQ(reinterpret_cast<uintptr_t>(buffer->data()) & (16 - 1), 0)
+ << "Components should be 16-byte aligned: " << buffer->data();
+
+ ASAN_POISON_MEMORY_REGION(buffer->data(), buffer->size());
+
+ return new Component(buffer);
+}
+
+// LOCKING: component_lock_ must be held by the current thread.
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::AddComponent(ArenaBase::Component *component) {
+ ReleaseStoreCurrent(component);
+ arena_.push_back(unique_ptr<Component>(component));
+ arena_footprint_ += component->size();
+}
+
+template <bool THREADSAFE>
+void ArenaBase<THREADSAFE>::Reset() {
+ std::lock_guard<mutex_type> lock(component_lock_);
+
+ if (PREDICT_FALSE(arena_.size() > 1)) {
+ unique_ptr<Component> last = std::move(arena_.back());
+ arena_.clear();
+ arena_.emplace_back(std::move(last));
+ ReleaseStoreCurrent(arena_[0].get());
+ }
+ arena_.back()->Reset();
+ arena_footprint_ = arena_.back()->size();
+
+#ifndef NDEBUG
+ // In debug mode release the last component too for (hopefully) better
+ // detection of memory-related bugs (invalid shallow copies, etc.).
+ size_t last_size = arena_.back()->size();
+ arena_.clear();
+ AddComponent(CHECK_NOTNULL(NewComponent(last_size, 0)));
+ arena_footprint_ = 0;
+#endif
+}
+
+template <bool THREADSAFE>
+size_t ArenaBase<THREADSAFE>::memory_footprint() const {
+ std::lock_guard<mutex_type> lock(component_lock_);
+ return arena_footprint_;
+}
+
+// Explicit instantiation.
+template class ArenaBase<true>;
+template class ArenaBase<false>;
+
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/arena.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/arena.h b/be/src/kudu/util/memory/arena.h
new file mode 100644
index 0000000..6d9843b
--- /dev/null
+++ b/be/src/kudu/util/memory/arena.h
@@ -0,0 +1,501 @@
+// Copyright 2010 Google Inc. All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+//
+// Memory arena for variable-length datatypes and STL collections.
+
+#ifndef KUDU_UTIL_MEMORY_ARENA_H_
+#define KUDU_UTIL_MEMORY_ARENA_H_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <new>
+#include <ostream>
+#include <vector>
+
+#include <boost/signals2/dummy_mutex.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/util/alignment.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/memory/memory.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/slice.h"
+
+namespace kudu {
+
+template<bool THREADSAFE> struct ArenaTraits;
+
+template <> struct ArenaTraits<true> {
+ typedef Atomic32 offset_type;
+ typedef Mutex mutex_type;
+ typedef simple_spinlock spinlock_type;
+};
+
+template <> struct ArenaTraits<false> {
+ typedef uint32_t offset_type;
+ // For non-threadsafe, we don't need any real locking.
+ typedef boost::signals2::dummy_mutex mutex_type;
+ typedef boost::signals2::dummy_mutex spinlock_type;
+};
+
+// A helper class for storing variable-length blobs (e.g. strings). Once a blob
+// is added to the arena, its index stays fixed. No reallocation happens.
+// Instead, the arena keeps a list of buffers. When it needs to grow, it
+// allocates a new buffer. Each subsequent buffer is 2x larger, than its
+// predecessor, until the maximum specified buffer size is reached.
+// The buffers are furnished by a designated allocator.
+//
+// This class is thread-safe with the fast path lock-free.
+template <bool THREADSAFE>
+class ArenaBase {
+ public:
+ // Arenas are required to have a minimum size of at least this amount.
+ static const size_t kMinimumChunkSize;
+
+ // Creates a new arena, with a single buffer of size up-to initial_buffer_size
+ // and maximum capacity (i.e. total sizes of all buffers)
+ // possibly limited by the buffer allocator. The allocator might cap the
+ // initial allocation request arbitrarily (down to zero). As a consequence,
+ // arena construction never fails due to OOM.
+ //
+ // Calls to AllocateBytes() will then give out bytes from the working buffer
+ // until it is exhausted. Then, a subsequent working buffer will be allocated.
+ // The size of the next buffer is normally 2x the size of the previous buffer.
+ // It might be capped by the allocator, or by the max_buffer_size of the Arena,
+ // settable by SetMaxBufferSize below.
+ //
+ // The default maximum buffer size is ~1MB. See 'SetMaxBufferSize' for details
+ // on when you would want to configure this differently.
+ ArenaBase(BufferAllocator* buffer_allocator,
+ size_t initial_buffer_size);
+
+ // Creates an arena using a default (heap) allocator.
+ explicit ArenaBase(size_t initial_buffer_size);
+
+ // Set the maximum buffer size allocated for this arena.
+ // The maximum buffer size allowed is slightly less than ~1MB (8192 * 127 bytes).
+ //
+ // Consider the following pros/cons of large buffer sizes:
+ //
+ // Pros:
+ // - Fewer heap allocations if the arena will hold a lot of data.
+ // (hence better allocation performance out of the arena)
+ // - Better page locality for objects allocated out of the same arena,
+ // especially if huge pages are in use.
+ // - Less internal fragmentation at the "end" of each buffer if the
+ // size of allocations from the arena is close to the size of the
+ // buffer. For example, with a 128KB max buffer size and 65KB
+ // allocations, we will only be able to make one allocation from
+ // each buffer and waste nearly 50% of memory.
+ // Cons:
+ // - Larger heap allocations may be more difficult to fulfill if the
+ // heap is fragmented.
+ //
+ // Overall, if you aren't sure, just leave it at the default.
+ //
+ // NOTE: this method is not thread-safe, even in the thread-safe variant.
+ // It is expected to call this only immediately after constructing the
+ // Arena instance, but before making any allocations.
+ void SetMaxBufferSize(size_t size);
+
+ // Adds content of the specified Slice to the arena, and returns a
+ // pointer to it. The pointer is guaranteed to remain valid during the
+ // lifetime of the arena. The Slice object itself is not copied. The
+ // size information is not stored.
+ // (Normal use case is that the caller already has an array of Slices,
+ // where it keeps these pointers together with size information).
+ // If this request would make the arena grow and the allocator denies that,
+ // returns NULL and leaves the arena unchanged.
+ uint8_t *AddSlice(const Slice& value);
+
+ // Same as above.
+ void * AddBytes(const void *data, size_t len);
+
+ // Handy wrapper for placement-new.
+ //
+ // This ensures that the returned object is properly aligned based on
+ // alignof(T).
+ template<class T, typename ... Args>
+ T *NewObject(Args&&... args);
+
+ // Relocate the given Slice into the arena, setting 'dst' and
+ // returning true if successful.
+ // It is legal for 'dst' to be a pointer to 'src'.
+ // See AddSlice above for detail on memory lifetime.
+ bool RelocateSlice(const Slice &src, Slice *dst);
+
+ // Similar to the above, but for StringPiece.
+ bool RelocateStringPiece(const StringPiece& src, StringPiece* sp);
+
+ // Reserves a blob of the specified size in the arena, and returns a pointer
+ // to it. The caller can then fill the allocated memory. The pointer is
+ // guaranteed to remain valid during the lifetime of the arena.
+ // If this request would make the arena grow and the allocator denies that,
+ // returns NULL and leaves the arena unchanged.
+ void* AllocateBytes(const size_t size) {
+ return AllocateBytesAligned(size, 1);
+ }
+
+ // Allocate bytes, ensuring a specified alignment.
+ // NOTE: alignment MUST be a power of two, or else this will break.
+ void* AllocateBytesAligned(const size_t size, const size_t alignment);
+
+ // Removes all data from the arena. (Invalidates all pointers returned by
+ // AddSlice and AllocateBytes). Does not cause memory allocation.
+ // May reduce memory footprint, as it discards all allocated buffers but
+ // the last one.
+ // Unless allocations exceed max_buffer_size, repetitive filling up and
+ // resetting normally lead to quickly settling memory footprint and ceasing
+ // buffer allocations, as the arena keeps reusing a single, large buffer.
+ void Reset();
+
+ // Returns the memory footprint of this arena, in bytes, defined as a sum of
+ // all buffer sizes. Always greater or equal to the total number of
+ // bytes allocated out of the arena.
+ size_t memory_footprint() const;
+
+ private:
+ typedef typename ArenaTraits<THREADSAFE>::mutex_type mutex_type;
+ // Encapsulates a single buffer in the arena.
+ class Component;
+
+ // Fallback for AllocateBytes non-fast-path
+ void* AllocateBytesFallback(const size_t size, const size_t align);
+
+ Component* NewComponent(size_t requested_size, size_t minimum_size);
+ void AddComponent(Component *component);
+
+ // Load the current component, with "Acquire" semantics (see atomicops.h)
+ // if the arena is meant to be thread-safe.
+ inline Component* AcquireLoadCurrent() {
+ if (THREADSAFE) {
+ return reinterpret_cast<Component*>(
+ base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(¤t_)));
+ } else {
+ return current_;
+ }
+ }
+
+ // Store the current component, with "Release" semantics (see atomicops.h)
+ // if the arena is meant to be thread-safe.
+ inline void ReleaseStoreCurrent(Component* c) {
+ if (THREADSAFE) {
+ base::subtle::Release_Store(reinterpret_cast<AtomicWord*>(¤t_),
+ reinterpret_cast<AtomicWord>(c));
+ } else {
+ current_ = c;
+ }
+ }
+
+ BufferAllocator* const buffer_allocator_;
+ std::vector<std::unique_ptr<Component> > arena_;
+
+ // The current component to allocate from.
+ // Use AcquireLoadCurrent and ReleaseStoreCurrent to load/store.
+ Component* current_;
+ size_t max_buffer_size_;
+ size_t arena_footprint_;
+
+ // Lock covering 'slow path' allocation, when new components are
+ // allocated and added to the arena's list. Also covers any other
+ // mutation of the component data structure (eg Reset).
+ mutable mutex_type component_lock_;
+
+ DISALLOW_COPY_AND_ASSIGN(ArenaBase);
+};
+
+// STL-compliant allocator, for use with hash_maps and other structures
+// which share lifetime with an Arena. Enables memory control and improves
+// performance.
+template<class T, bool THREADSAFE> class ArenaAllocator {
+ public:
+ typedef T value_type;
+ typedef size_t size_type;
+ typedef ptrdiff_t difference_type;
+
+ typedef T* pointer;
+ typedef const T* const_pointer;
+ typedef T& reference;
+ typedef const T& const_reference;
+ pointer index(reference r) const { return &r; }
+ const_pointer index(const_reference r) const { return &r; }
+ size_type max_size() const { return size_t(-1) / sizeof(T); }
+
+ explicit ArenaAllocator(ArenaBase<THREADSAFE>* arena) : arena_(arena) {
+ CHECK_NOTNULL(arena_);
+ }
+
+ ~ArenaAllocator() { }
+
+ pointer allocate(size_type n, std::allocator<void>::const_pointer /*hint*/ = 0) {
+ return reinterpret_cast<T*>(arena_->AllocateBytes(n * sizeof(T)));
+ }
+
+ void deallocate(pointer p, size_type n) {}
+
+ void construct(pointer p, const T& val) {
+ new(reinterpret_cast<void*>(p)) T(val);
+ }
+
+ void destroy(pointer p) { p->~T(); }
+
+ template<class U> struct rebind {
+ typedef ArenaAllocator<U, THREADSAFE> other;
+ };
+
+ template<class U, bool TS> ArenaAllocator(const ArenaAllocator<U, TS>& other)
+ : arena_(other.arena()) { }
+
+ template<class U, bool TS> bool operator==(const ArenaAllocator<U, TS>& other) const {
+ return arena_ == other.arena();
+ }
+
+ template<class U, bool TS> bool operator!=(const ArenaAllocator<U, TS>& other) const {
+ return arena_ != other.arena();
+ }
+
+ ArenaBase<THREADSAFE> *arena() const {
+ return arena_;
+ }
+
+ private:
+
+ ArenaBase<THREADSAFE>* arena_;
+};
+
+
+class Arena : public ArenaBase<false> {
+ public:
+ explicit Arena(size_t initial_buffer_size) :
+ ArenaBase<false>(initial_buffer_size)
+ {}
+};
+
+class ThreadSafeArena : public ArenaBase<true> {
+ public:
+ explicit ThreadSafeArena(size_t initial_buffer_size) :
+ ArenaBase<true>(initial_buffer_size)
+ {}
+};
+
+// Arena implementation that is integrated with MemTracker in order to
+// track heap-allocated space consumed by the arena.
+
+class MemoryTrackingArena : public ArenaBase<false> {
+ public:
+
+ MemoryTrackingArena(
+ size_t initial_buffer_size,
+ const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+ : ArenaBase<false>(tracking_allocator.get(), initial_buffer_size),
+ tracking_allocator_(tracking_allocator) {}
+
+ ~MemoryTrackingArena() {
+ }
+
+ private:
+
+ // This is required in order for the Arena to survive even after tablet is shut down,
+ // e.g., in the case of Scanners running scanners (see tablet_server-test.cc)
+ std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+class ThreadSafeMemoryTrackingArena : public ArenaBase<true> {
+ public:
+
+ ThreadSafeMemoryTrackingArena(
+ size_t initial_buffer_size,
+ const std::shared_ptr<MemoryTrackingBufferAllocator>& tracking_allocator)
+ : ArenaBase<true>(tracking_allocator.get(), initial_buffer_size),
+ tracking_allocator_(tracking_allocator) {}
+
+ ~ThreadSafeMemoryTrackingArena() {
+ }
+
+ private:
+
+ // See comment in MemoryTrackingArena above.
+ std::shared_ptr<MemoryTrackingBufferAllocator> tracking_allocator_;
+};
+
+// Implementation of inline and template methods
+
+template<bool THREADSAFE>
+class ArenaBase<THREADSAFE>::Component {
+ public:
+ explicit Component(Buffer* buffer)
+ : buffer_(buffer),
+ data_(static_cast<uint8_t*>(buffer->data())),
+ offset_(0),
+ size_(buffer->size()) {}
+
+ // Tries to reserve space in this component. Returns the pointer to the
+ // reserved space if successful; NULL on failure (if there's no more room).
+ uint8_t* AllocateBytes(const size_t size) {
+ return AllocateBytesAligned(size, 1);
+ }
+
+ uint8_t *AllocateBytesAligned(const size_t size, const size_t alignment);
+
+ size_t size() const { return size_; }
+ void Reset() {
+ ASAN_POISON_MEMORY_REGION(data_, size_);
+ offset_ = 0;
+ }
+
+ private:
+ // Mark the given range unpoisoned in ASAN.
+ // This is a no-op in a non-ASAN build.
+ void AsanUnpoison(const void* addr, size_t size);
+
+ gscoped_ptr<Buffer> buffer_;
+ uint8_t* const data_;
+ typename ArenaTraits<THREADSAFE>::offset_type offset_;
+ const size_t size_;
+
+#ifdef ADDRESS_SANITIZER
+ // Lock used around unpoisoning memory when ASAN is enabled.
+ // ASAN does not support concurrent unpoison calls that may overlap a particular
+ // memory word (8 bytes).
+ typedef typename ArenaTraits<THREADSAFE>::spinlock_type spinlock_type;
+ spinlock_type asan_lock_;
+#endif
+ DISALLOW_COPY_AND_ASSIGN(Component);
+};
+
+
+// Thread-safe implementation
+template <>
+inline uint8_t *ArenaBase<true>::Component::AllocateBytesAligned(
+ const size_t size, const size_t alignment) {
+ // Special case check the allowed alignments. Currently, we only ensure
+ // the allocated buffer components are 16-byte aligned, and the code path
+ // doesn't support larger alignment.
+ DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+ alignment == 8 || alignment == 16)
+ << "bad alignment: " << alignment;
+ retry:
+ Atomic32 offset = Acquire_Load(&offset_);
+
+ Atomic32 aligned = KUDU_ALIGN_UP(offset, alignment);
+ Atomic32 new_offset = aligned + size;
+
+ if (PREDICT_TRUE(new_offset <= size_)) {
+ bool success = Acquire_CompareAndSwap(&offset_, offset, new_offset) == offset;
+ if (PREDICT_TRUE(success)) {
+ AsanUnpoison(data_ + aligned, size);
+ return data_ + aligned;
+ } else {
+ // Raced with another allocator
+ goto retry;
+ }
+ } else {
+ return NULL;
+ }
+}
+
+// Non-Threadsafe implementation
+template <>
+inline uint8_t *ArenaBase<false>::Component::AllocateBytesAligned(
+ const size_t size, const size_t alignment) {
+ DCHECK(alignment == 1 || alignment == 2 || alignment == 4 ||
+ alignment == 8 || alignment == 16)
+ << "bad alignment: " << alignment;
+ size_t aligned = KUDU_ALIGN_UP(offset_, alignment);
+ uint8_t* destination = data_ + aligned;
+ size_t save_offset = offset_;
+ offset_ = aligned + size;
+ if (PREDICT_TRUE(offset_ <= size_)) {
+ AsanUnpoison(data_ + aligned, size);
+ return destination;
+ } else {
+ offset_ = save_offset;
+ return NULL;
+ }
+}
+
+template <bool THREADSAFE>
+inline void ArenaBase<THREADSAFE>::Component::AsanUnpoison(const void* addr, size_t size) {
+#ifdef ADDRESS_SANITIZER
+ std::lock_guard<spinlock_type> l(asan_lock_);
+ ASAN_UNPOISON_MEMORY_REGION(addr, size);
+#endif
+}
+
+// Fast-path allocation should get inlined, and fall-back
+// to non-inline function call for allocation failure
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AllocateBytesAligned(const size_t size, const size_t align) {
+ void* result = AcquireLoadCurrent()->AllocateBytesAligned(size, align);
+ if (PREDICT_TRUE(result != NULL)) return result;
+ return AllocateBytesFallback(size, align);
+}
+
+template <bool THREADSAFE>
+inline uint8_t* ArenaBase<THREADSAFE>::AddSlice(const Slice& value) {
+ return reinterpret_cast<uint8_t *>(AddBytes(value.data(), value.size()));
+}
+
+template <bool THREADSAFE>
+inline void *ArenaBase<THREADSAFE>::AddBytes(const void *data, size_t len) {
+ void* destination = AllocateBytes(len);
+ if (destination == NULL) return NULL;
+ memcpy(destination, data, len);
+ return destination;
+}
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateSlice(const Slice &src, Slice *dst) {
+ void* destination = AllocateBytes(src.size());
+ if (destination == NULL) return false;
+ memcpy(destination, src.data(), src.size());
+ *dst = Slice(reinterpret_cast<uint8_t *>(destination), src.size());
+ return true;
+}
+
+
+template <bool THREADSAFE>
+inline bool ArenaBase<THREADSAFE>::RelocateStringPiece(const StringPiece& src, StringPiece* sp) {
+ Slice slice(src.data(), src.size());
+ if (!RelocateSlice(slice, &slice)) return false;
+ *sp = StringPiece(reinterpret_cast<const char*>(slice.data()), slice.size());
+ return true;
+}
+
+template<bool THREADSAFE>
+template<class T, class ... Args>
+inline T *ArenaBase<THREADSAFE>::NewObject(Args&&... args) {
+ void *mem = AllocateBytesAligned(sizeof(T), alignof(T));
+ if (mem == NULL) throw std::bad_alloc();
+ return new (mem) T(std::forward<Args>(args)...);
+}
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MEMORY_ARENA_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memory/memory.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memory/memory.cc b/be/src/kudu/util/memory/memory.cc
new file mode 100644
index 0000000..b3964df
--- /dev/null
+++ b/be/src/kudu/util/memory/memory.cc
@@ -0,0 +1,339 @@
+// Copyright 2010 Google Inc. All Rights Reserved
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "kudu/util/memory/memory.h"
+
+#include <mm_malloc.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <cstring>
+
+#include <gflags/gflags.h>
+
+#include "kudu/util/alignment.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/memory/overwrite.h"
+#include "kudu/util/mem_tracker.h"
+
+using std::copy;
+using std::min;
+
+// TODO(onufry) - test whether the code still tests OK if we set this to true,
+// or remove this code and add a test that Google allocator does not change it's
+// contract - 16-aligned in -c opt and %16 == 8 in debug.
+DEFINE_bool(allocator_aligned_mode, false,
+ "Use 16-byte alignment instead of 8-byte, "
+ "unless explicitly specified otherwise - to boost SIMD");
+TAG_FLAG(allocator_aligned_mode, hidden);
+
+namespace kudu {
+
+namespace {
+static char dummy_buffer[0] = {};
+}
+
+Buffer::~Buffer() {
+#if !defined(NDEBUG) && !defined(ADDRESS_SANITIZER)
+ // "unrolling" the string "BAD" makes for a much more efficient
+ // OverwriteWithPattern call in debug mode, so we can keep this
+ // useful bit of code without tests going slower!
+ //
+ // In ASAN mode, we don't bother with this, because when we free the memory, ASAN will
+ // prevent us from accessing it anyway.
+ OverwriteWithPattern(reinterpret_cast<char*>(data_), size_,
+ "BADBADBADBADBADBADBADBADBADBADBAD"
+ "BADBADBADBADBADBADBADBADBADBADBAD"
+ "BADBADBADBADBADBADBADBADBADBADBAD");
+#endif
+ if (allocator_ != nullptr) allocator_->FreeInternal(this);
+}
+
+void BufferAllocator::LogAllocation(size_t requested,
+ size_t minimal,
+ Buffer* buffer) {
+ if (buffer == nullptr) {
+ LOG(WARNING) << "Memory allocation failed. "
+ << "Number of bytes requested: " << requested
+ << ", minimal: " << minimal;
+ return;
+ }
+ if (buffer->size() < requested) {
+ LOG(WARNING) << "Memory allocation was shorter than requested. "
+ << "Number of bytes requested to allocate: " << requested
+ << ", minimal: " << minimal
+ << ", and actually allocated: " << buffer->size();
+ }
+}
+
+HeapBufferAllocator::HeapBufferAllocator()
+ : aligned_mode_(FLAGS_allocator_aligned_mode) {
+}
+
+Buffer* HeapBufferAllocator::AllocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ BufferAllocator* const originator) {
+ DCHECK_LE(minimal, requested);
+ void* data;
+ size_t attempted = requested;
+ while (true) {
+ data = (attempted == 0) ? &dummy_buffer[0] : Malloc(attempted);
+ if (data != nullptr) {
+ return CreateBuffer(data, attempted, originator);
+ }
+ if (attempted == minimal) return nullptr;
+ attempted = minimal + (attempted - minimal - 1) / 2;
+ }
+}
+
+bool HeapBufferAllocator::ReallocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ Buffer* const buffer,
+ BufferAllocator* const originator) {
+ DCHECK_LE(minimal, requested);
+ void* data;
+ size_t attempted = requested;
+ while (true) {
+ if (attempted == 0) {
+ if (buffer->size() > 0) free(buffer->data());
+ data = &dummy_buffer[0];
+ } else {
+ if (buffer->size() > 0) {
+ data = Realloc(buffer->data(), buffer->size(), attempted);
+ } else {
+ data = Malloc(attempted);
+ }
+ }
+ if (data != nullptr) {
+ UpdateBuffer(data, attempted, buffer);
+ return true;
+ }
+ if (attempted == minimal) return false;
+ attempted = minimal + (attempted - minimal - 1) / 2;
+ }
+}
+
+void HeapBufferAllocator::FreeInternal(Buffer* buffer) {
+ if (buffer->size() > 0) free(buffer->data());
+}
+
+void* HeapBufferAllocator::Malloc(size_t size) {
+ if (aligned_mode_) {
+ void* data;
+ if (posix_memalign(&data, 16, KUDU_ALIGN_UP(size, 16))) {
+ return nullptr;
+ }
+ return data;
+ } else {
+ return malloc(size);
+ }
+}
+
+void* HeapBufferAllocator::Realloc(void* previous_data, size_t previous_size,
+ size_t new_size) {
+ if (aligned_mode_) {
+ void* data = Malloc(new_size);
+ if (data) {
+// NOTE(ptab): We should use realloc here to avoid memmory coping,
+// but it doesn't work on memory allocated by posix_memalign(...).
+// realloc reallocates the memory but doesn't preserve the content.
+// TODO(ptab): reiterate after some time to check if it is fixed (tcmalloc ?)
+ memcpy(data, previous_data, min(previous_size, new_size));
+ free(previous_data);
+ return data;
+ } else {
+ return nullptr;
+ }
+ } else {
+ return realloc(previous_data, new_size);
+ }
+}
+
+Buffer* ClearingBufferAllocator::AllocateInternal(size_t requested,
+ size_t minimal,
+ BufferAllocator* originator) {
+ Buffer* buffer = DelegateAllocate(delegate_, requested, minimal,
+ originator);
+ if (buffer != nullptr) memset(buffer->data(), 0, buffer->size());
+ return buffer;
+}
+
+bool ClearingBufferAllocator::ReallocateInternal(size_t requested,
+ size_t minimal,
+ Buffer* buffer,
+ BufferAllocator* originator) {
+ size_t offset = (buffer != nullptr ? buffer->size() : 0);
+ bool success = DelegateReallocate(delegate_, requested, minimal, buffer,
+ originator);
+ if (success && buffer->size() > offset) {
+ memset(static_cast<char*>(buffer->data()) + offset, 0,
+ buffer->size() - offset);
+ }
+ return success;
+}
+
+void ClearingBufferAllocator::FreeInternal(Buffer* buffer) {
+ DelegateFree(delegate_, buffer);
+}
+
+Buffer* MediatingBufferAllocator::AllocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ BufferAllocator* const originator) {
+ // Allow the mediator to trim the request.
+ size_t granted;
+ if (requested > 0) {
+ granted = mediator_->Allocate(requested, minimal);
+ if (granted < minimal) return nullptr;
+ } else {
+ granted = 0;
+ }
+ Buffer* buffer = DelegateAllocate(delegate_, granted, minimal, originator);
+ if (buffer == nullptr) {
+ mediator_->Free(granted);
+ } else if (buffer->size() < granted) {
+ mediator_->Free(granted - buffer->size());
+ }
+ return buffer;
+}
+
+bool MediatingBufferAllocator::ReallocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ Buffer* const buffer,
+ BufferAllocator* const originator) {
+ // Allow the mediator to trim the request. Be conservative; assume that
+ // realloc may degenerate to malloc-memcpy-free.
+ size_t granted;
+ if (requested > 0) {
+ granted = mediator_->Allocate(requested, minimal);
+ if (granted < minimal) return false;
+ } else {
+ granted = 0;
+ }
+ size_t old_size = buffer->size();
+ if (DelegateReallocate(delegate_, granted, minimal, buffer, originator)) {
+ mediator_->Free(granted - buffer->size() + old_size);
+ return true;
+ } else {
+ mediator_->Free(granted);
+ return false;
+ }
+}
+
+void MediatingBufferAllocator::FreeInternal(Buffer* buffer) {
+ mediator_->Free(buffer->size());
+ DelegateFree(delegate_, buffer);
+}
+
+Buffer* MemoryStatisticsCollectingBufferAllocator::AllocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ BufferAllocator* const originator) {
+ Buffer* buffer = DelegateAllocate(delegate_, requested, minimal, originator);
+ if (buffer != nullptr) {
+ memory_stats_collector_->AllocatedMemoryBytes(buffer->size());
+ } else {
+ memory_stats_collector_->RefusedMemoryBytes(minimal);
+ }
+ return buffer;
+}
+
+bool MemoryStatisticsCollectingBufferAllocator::ReallocateInternal(
+ const size_t requested,
+ const size_t minimal,
+ Buffer* const buffer,
+ BufferAllocator* const originator) {
+ const size_t old_size = buffer->size();
+ bool outcome = DelegateReallocate(delegate_, requested, minimal, buffer,
+ originator);
+ if (buffer->size() > old_size) {
+ memory_stats_collector_->AllocatedMemoryBytes(buffer->size() - old_size);
+ } else if (buffer->size() < old_size) {
+ memory_stats_collector_->FreedMemoryBytes(old_size - buffer->size());
+ } else if (!outcome && (minimal > buffer->size())) {
+ memory_stats_collector_->RefusedMemoryBytes(minimal - buffer->size());
+ }
+ return outcome;
+}
+
+void MemoryStatisticsCollectingBufferAllocator::FreeInternal(Buffer* buffer) {
+ DelegateFree(delegate_, buffer);
+ memory_stats_collector_->FreedMemoryBytes(buffer->size());
+}
+
+size_t MemoryTrackingBufferAllocator::Available() const {
+ return enforce_limit_ ? mem_tracker_->SpareCapacity() : std::numeric_limits<int64_t>::max();
+}
+
+bool MemoryTrackingBufferAllocator::TryConsume(int64_t bytes) {
+ // Calls TryConsume first, even if enforce_limit_ is false: this
+ // will cause mem_tracker_ to try to free up more memory by GCing.
+ if (!mem_tracker_->TryConsume(bytes)) {
+ if (enforce_limit_) {
+ return false;
+ } else {
+ // If enforce_limit_ is false, allocate memory anyway.
+ mem_tracker_->Consume(bytes);
+ }
+ }
+ return true;
+}
+
+Buffer* MemoryTrackingBufferAllocator::AllocateInternal(size_t requested,
+ size_t minimal,
+ BufferAllocator* originator) {
+ if (TryConsume(requested)) {
+ Buffer* buffer = DelegateAllocate(delegate_, requested, requested, originator);
+ if (buffer == nullptr) {
+ mem_tracker_->Release(requested);
+ } else {
+ return buffer;
+ }
+ }
+
+ if (TryConsume(minimal)) {
+ Buffer* buffer = DelegateAllocate(delegate_, minimal, minimal, originator);
+ if (buffer == nullptr) {
+ mem_tracker_->Release(minimal);
+ }
+ return buffer;
+ }
+
+ return nullptr;
+}
+
+
+bool MemoryTrackingBufferAllocator::ReallocateInternal(size_t requested,
+ size_t minimal,
+ Buffer* buffer,
+ BufferAllocator* originator) {
+ LOG(FATAL) << "Not implemented";
+ return false;
+}
+
+void MemoryTrackingBufferAllocator::FreeInternal(Buffer* buffer) {
+ DelegateFree(delegate_, buffer);
+ mem_tracker_->Release(buffer->size());
+}
+
+} // namespace kudu