You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/08/14 21:19:24 UTC
[kudu] 02/03: KUDU-2844 (2/3): move RowBlock memory into a new
RowBlockMemory struct
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fb0f4bc3bf63614a831fedc6cc29cf860dddaf49
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Apr 23 14:30:49 2020 -0700
KUDU-2844 (2/3): move RowBlock memory into a new RowBlockMemory struct
This takes the Arena* member of RowBlock and moves it into a new
RowBlockMemory structure. The RowBlockMemory structure will later
be extended to include a list of reference-counted block handles.
Change-Id: I17a21f33f44988795ffe064b3ba41055e1a19e90
Reviewed-on: http://gerrit.cloudera.org:8080/15801
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Kudu Jenkins
---
src/kudu/cfile/block_handle.h | 1 +
src/kudu/cfile/cfile-test-base.h | 5 +-
src/kudu/cfile/cfile-test.cc | 57 +++++++++-------
src/kudu/cfile/cfile_util.cc | 9 ++-
src/kudu/cfile/encoding-test.cc | 17 ++---
src/kudu/codegen/codegen-test.cc | 17 ++---
src/kudu/common/column_predicate-test.cc | 2 +-
src/kudu/common/columnblock-test-util.h | 67 ++++++++++++++++++
src/kudu/common/columnblock-test.cc | 3 +-
src/kudu/common/columnblock.cc | 4 +-
src/kudu/common/columnblock.h | 79 ++++++----------------
src/kudu/common/generic_iterators-test.cc | 22 +++---
src/kudu/common/generic_iterators.cc | 29 ++++----
src/kudu/common/rowblock.cc | 4 +-
src/kudu/common/rowblock.h | 12 ++--
src/kudu/common/rowblock_memory.h | 37 ++++++++++
src/kudu/common/wire_protocol-test.cc | 35 +++++-----
src/kudu/integration-tests/linked_list-test-util.h | 5 +-
src/kudu/master/sys_catalog.cc | 6 +-
src/kudu/tablet/cfile_set-test.cc | 16 +++--
src/kudu/tablet/compaction-test.cc | 8 ++-
src/kudu/tablet/compaction.cc | 11 +--
src/kudu/tablet/delta_compaction.cc | 19 +++---
src/kudu/tablet/deltafile-test.cc | 17 +++--
src/kudu/tablet/diskrowset-test-base.h | 12 ++--
src/kudu/tablet/memrowset-test.cc | 6 +-
src/kudu/tablet/mt-rowset_delta_compaction-test.cc | 7 +-
src/kudu/tablet/mt-tablet-test.cc | 19 +++---
src/kudu/tablet/tablet-decoder-eval-test.cc | 8 ++-
src/kudu/tablet/tablet-test-base.h | 10 ++-
src/kudu/tablet/tablet-test-util.h | 9 +--
src/kudu/tablet/tablet-test.cc | 13 ++--
src/kudu/tablet/tablet_random_access-test.cc | 8 +--
src/kudu/tools/tool_action_local_replica.cc | 7 +-
src/kudu/tools/tool_action_perf.cc | 8 +--
src/kudu/transactions/txn_status_tablet.cc | 6 +-
src/kudu/tserver/tablet_server-test-base.cc | 7 +-
src/kudu/tserver/tablet_service.cc | 6 +-
38 files changed, 351 insertions(+), 257 deletions(-)
diff --git a/src/kudu/cfile/block_handle.h b/src/kudu/cfile/block_handle.h
index 2bd3544..d1bb62e 100644
--- a/src/kudu/cfile/block_handle.h
+++ b/src/kudu/cfile/block_handle.h
@@ -25,6 +25,7 @@
#include "kudu/cfile/block_cache.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/common/rowblock_memory.h"
namespace kudu {
namespace cfile {
diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h
index 64552b3..d79e7b5 100644
--- a/src/kudu/cfile/cfile-test-base.h
+++ b/src/kudu/cfile/cfile-test-base.h
@@ -29,6 +29,7 @@
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
@@ -447,7 +448,7 @@ void TimeReadFileForDataType(CFileIterator* iter, int* count) {
ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n);
*count += n;
- cb.arena()->Reset();
+ cb.memory()->Reset();
}
LOG(INFO)<< "Sum: " << sum;
LOG(INFO)<< "Count: " << *count;
@@ -469,7 +470,7 @@ void ReadBinaryFile(CFileIterator* iter, int* count) {
}
}
*count += n;
- cb.arena()->Reset();
+ cb.memory()->Reset();
}
LOG(INFO) << "Sum of value lengths: " << sum_lens;
LOG(INFO) << "Count: " << *count;
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index 39a89c3..6e64f1f 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -42,10 +42,12 @@
#include "kudu/cfile/index_btree.h"
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/column_materialization_context.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
@@ -67,7 +69,6 @@
#include "kudu/util/int128.h"
#include "kudu/util/int128_util.h"
#include "kudu/util/mem_tracker.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/nvm_cache.h"
#include "kudu/util/slice.h"
@@ -76,6 +77,10 @@
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
+namespace kudu {
+class Arena;
+} // namespace kudu
+
DECLARE_bool(cfile_write_checksums);
DECLARE_bool(cfile_verify_checksums);
DECLARE_string(block_cache_type);
@@ -163,9 +168,11 @@ class TestCFile : public CFileTestBase {
ASSERT_OK(iter->SeekToOrdinal(0));
size_t fetched = 0;
while (fetched < 10000) {
- ColumnBlock advancing_block(out.type_info(), nullptr,
+ ColumnBlock advancing_block(out.type_info(),
+ nullptr,
out.data() + (fetched * out.stride()),
- out.nrows() - fetched, out.arena());
+ out.nrows() - fetched,
+ out.memory());
ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel);
ASSERT_TRUE(iter->HasNext());
size_t batch_size = random() % 5 + 1;
@@ -204,7 +211,7 @@ class TestCFile : public CFileTestBase {
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
- Arena arena(8192);
+ RowBlockMemory mem;
ScopedColumnBlock<DataGeneratorType::kDataType> cb(10);
SelectionVector sel(10);
@@ -234,7 +241,7 @@ class TestCFile : public CFileTestBase {
ASSERT_EQ((*generator)[j], cb[j]);
}
}
- cb.arena()->Reset();
+ cb.memory()->Reset();
read_offset += n;
}
}
@@ -431,11 +438,9 @@ INSTANTIATE_TEST_CASE_P(CacheMemoryTypes, TestCFileBothCacheMemoryTypes,
::testing::Values(Cache::MemoryType::DRAM,
Cache::MemoryType::NVM));
-template<DataType type>
-void CopyOne(CFileIterator *it,
- typename TypeTraits<type>::cpp_type *ret,
- Arena *arena) {
- ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, arena);
+template <DataType type>
+void CopyOne(CFileIterator* it, typename TypeTraits<type>::cpp_type* ret, RowBlockMemory* mem) {
+ ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, mem);
SelectionVector sel(1);
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
ctx.SetDecoderEvalNotSupported();
@@ -634,18 +639,18 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
- Arena arena(1024);
+ RowBlockMemory mem;
ASSERT_OK(iter->SeekToOrdinal(5000));
- ASSERT_EQ(5000u, iter->GetCurrentOrdinal());
+ ASSERT_EQ(5000, iter->GetCurrentOrdinal());
Slice s;
- CopyOne<STRING>(iter.get(), &s, &arena);
+ CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(5000), s.ToString());
// Seek to last key exactly, should succeed
ASSERT_OK(iter->SeekToOrdinal(9999));
- ASSERT_EQ(9999u, iter->GetCurrentOrdinal());
+ ASSERT_EQ(9999, iter->GetCurrentOrdinal());
// Seek to after last key. Should result in not found.
ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound());
@@ -662,30 +667,30 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
// (seek to "hello 0000.5" through "hello 9999.5")
string buf;
for (int i = 1; i < 10000; i++) {
- arena.Reset();
+ mem.Reset();
buf = formatter(i - 1);
buf.append(".5");
s = Slice(buf);
- EncodeStringKey(schema, s, &arena, &encoded_key);
+ EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_FALSE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
- CopyOne<STRING>(iter.get(), &s, &arena);
+ CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(i), s.ToString());
}
// Seek exactly to each key
// (seek to "hello 0000" through "hello 9999")
for (int i = 0; i < 9999; i++) {
- arena.Reset();
+ mem.Reset();
buf = formatter(i);
s = Slice(buf);
- EncodeStringKey(schema, s, &arena, &encoded_key);
+ EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_TRUE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
Slice read_back;
- CopyOne<STRING>(iter.get(), &read_back, &arena);
+ CopyOne<STRING>(iter.get(), &read_back, &mem);
ASSERT_EQ(read_back.ToString(), s.ToString());
}
@@ -693,7 +698,7 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
// (seek to "hello 9999.x")
buf = formatter(9999) + ".x";
s = Slice(buf);
- EncodeStringKey(schema, s, &arena, &encoded_key);
+ EncodeStringKey(schema, s, &mem.arena, &encoded_key);
EXPECT_TRUE(iter->SeekAtOrAfter(*encoded_key, &exact).IsNotFound());
// before first entry
@@ -701,17 +706,17 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding,
buf = formatter(0);
buf.resize(buf.size() - 1);
s = Slice(buf);
- EncodeStringKey(schema, s, &arena, &encoded_key);
+ EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
EXPECT_FALSE(exact);
EXPECT_EQ(0, iter->GetCurrentOrdinal());
- CopyOne<STRING>(iter.get(), &s, &arena);
+ CopyOne<STRING>(iter.get(), &s, &mem);
EXPECT_EQ(formatter(0), s.ToString());
// Seek to start of file by ordinal
ASSERT_OK(iter->SeekToFirst());
ASSERT_EQ(0, iter->GetCurrentOrdinal());
- CopyOne<STRING>(iter.get(), &s, &arena);
+ CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(0), s.ToString());
// Reseek to start and fetch all data.
@@ -850,9 +855,9 @@ TEST_P(TestCFileBothCacheMemoryTypes, TestDefaultColumnIter) {
// Test String Default Value
Slice str_data[kNumItems];
Slice str_value("Hello");
- Arena arena(32*1024);
+ RowBlockMemory mem;
DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value);
- ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &arena);
+ ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &mem);
ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel);
ASSERT_OK(str_iter.Scan(&str_ctx));
for (size_t i = 0; i < str_col.nrows(); ++i) {
diff --git a/src/kudu/cfile/cfile_util.cc b/src/kudu/cfile/cfile_util.cc
index 3bdb9a7..c700e4e 100644
--- a/src/kudu/cfile/cfile_util.cc
+++ b/src/kudu/cfile/cfile_util.cc
@@ -28,11 +28,11 @@
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/mem_tracker.h"
-#include "kudu/util/memory/arena.h"
namespace kudu {
namespace cfile {
@@ -55,13 +55,12 @@ Status DumpIterator(const CFileReader& reader,
std::ostream* out,
int num_rows,
int indent) {
-
- Arena arena(8192);
+ RowBlockMemory mem(8192);
uint8_t buf[kBufSize];
const TypeInfo *type = reader.type_info();
size_t max_rows = kBufSize/type->size();
uint8_t nulls[BitmapSize(max_rows)];
- ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &arena);
+ ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &mem);
SelectionVector sel(max_rows);
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
string strbuf;
@@ -93,7 +92,7 @@ Status DumpIterator(const CFileReader& reader,
*out << strbuf;
strbuf.clear();
- arena.Reset();
+ mem.Reset();
count += n;
}
diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc
index cf5147c..c889175 100644
--- a/src/kudu/cfile/encoding-test.cc
+++ b/src/kudu/cfile/encoding-test.cc
@@ -40,8 +40,10 @@
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/type_encodings.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/port.h"
@@ -73,20 +75,20 @@ namespace cfile {
class TestEncoding : public KuduTest {
public:
TestEncoding()
- : arena_(1024) {
+ : memory_(1024) {
}
protected:
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
- arena_.Reset();
+ memory_.Reset();
default_write_options_.storage_attributes.cfile_block_size = 256 * 1024;
}
template<DataType type>
void CopyOne(BlockDecoder *decoder,
typename TypeTraits<type>::cpp_type *ret) {
- ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &arena_);
+ ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, &memory_);
ColumnDataView cdv(&cb);
size_t n = 1;
ASSERT_OK(decoder->CopyNextValues(&n, &cdv));
@@ -461,7 +463,7 @@ class TestEncoding : public KuduTest {
vector<CppType> decoded;
decoded.resize(size);
- ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &arena_);
+ ColumnBlock dst_block(GetTypeInfo(Type), nullptr, &decoded[0], size, &memory_);
ColumnDataView view(&dst_block);
int dec_count = 0;
while (bd->HasNext()) {
@@ -582,7 +584,7 @@ class TestEncoding : public KuduTest {
ColumnBlock dst_block(GetTypeInfo(IntType), nullptr,
&decoded[0],
to_insert.size(),
- &arena_);
+ &memory_);
int dec_count = 0;
while (ibd->HasNext()) {
ASSERT_EQ((uint32_t)(dec_count), ibd->GetCurrentIndex());
@@ -666,7 +668,7 @@ class TestEncoding : public KuduTest {
ColumnBlock dst_block(GetTypeInfo(BOOL), nullptr,
&decoded[0],
to_insert.size(),
- &arena_);
+ &memory_);
int dec_count = 0;
while (bd->HasNext()) {
@@ -704,8 +706,7 @@ class TestEncoding : public KuduTest {
}
}
- Arena arena_;
- faststring contiguous_buf_;
+ RowBlockMemory memory_;
WriterOptions default_write_options_;
};
diff --git a/src/kudu/codegen/codegen-test.cc b/src/kudu/codegen/codegen-test.cc
index 2a91e25..b8ccead 100644
--- a/src/kudu/codegen/codegen-test.cc
+++ b/src/kudu/codegen/codegen-test.cc
@@ -35,6 +35,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/singleton.h"
@@ -66,7 +67,7 @@ class CodegenTest : public KuduTest {
CodegenTest()
: random_(SeedRandom()),
// Set the initial Arena size as small as possible to catch errors during relocation.
- projections_arena_(16) {
+ projections_mem_(16) {
// Create the base schema.
vector<ColumnSchema> cols = { ColumnSchema("key ", UINT64, false),
ColumnSchema("int32 ", INT32, false),
@@ -138,7 +139,7 @@ class CodegenTest : public KuduTest {
private:
// Projects the test rows into parameter rowblock using projector and
- // member projections_arena_ (should be Reset() manually).
+ // member projections_mem_ (should be Reset() manually).
template<bool READ, class RowProjectorType>
void ProjectTestRows(RowProjectorType* rp, RowBlock* rb);
void AddRandomString(RowBuilder* rb);
@@ -153,7 +154,7 @@ class CodegenTest : public KuduTest {
codegen::CodeGenerator generator_;
Random random_;
unique_ptr<ConstContiguousRow> test_rows_[kNumTestRows];
- Arena projections_arena_;
+ RowBlockMemory projections_mem_;
unique_ptr<Arena> test_rows_arena_;
};
@@ -203,9 +204,9 @@ void CodegenTest::ProjectTestRows(RowProjectorType* rp, RowBlock* rb) {
ConstContiguousRow src = *test_rows_[i];
RowBlockRow dst = rb->row(i);
if (READ) {
- CHECK_OK(rp->ProjectRowForRead(src, &dst, &projections_arena_));
+ CHECK_OK(rp->ProjectRowForRead(src, &dst, rb->arena()));
} else {
- CHECK_OK(rp->ProjectRowForWrite(src, &dst, &projections_arena_));
+ CHECK_OK(rp->ProjectRowForWrite(src, &dst, rb->arena()));
}
}
}
@@ -220,10 +221,10 @@ void CodegenTest::TestProjection(const Schema* proj) {
CHECK_EQ(with->base_schema(), &base_);
CHECK_EQ(with->projection(), proj);
- RowBlock rb_with(proj, kNumTestRows, &projections_arena_);
- RowBlock rb_without(proj, kNumTestRows, &projections_arena_);
+ RowBlock rb_with(proj, kNumTestRows, &projections_mem_);
+ RowBlock rb_without(proj, kNumTestRows, &projections_mem_);
- projections_arena_.Reset();
+ projections_mem_.Reset();
ProjectTestRows<READ>(with.get(), &rb_with);
ProjectTestRows<READ>(&without, &rb_without);
CheckRowBlocksEqual(&rb_with, &rb_without, "Codegen", "Expected");
diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc
index 8b1a9b1..151ce8a 100644
--- a/src/kudu/common/column_predicate-test.cc
+++ b/src/kudu/common/column_predicate-test.cc
@@ -31,7 +31,7 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
-#include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/schema.h"
diff --git a/src/kudu/common/columnblock-test-util.h b/src/kudu/common/columnblock-test-util.h
new file mode 100644
index 0000000..db3764b
--- /dev/null
+++ b/src/kudu/common/columnblock-test-util.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include "kudu/common/columnblock.h"
+#include "kudu/common/rowblock.h"
+
+namespace kudu {
+
+// Utility class which allocates temporary storage for a
+// dense block of column data, freeing it when it goes
+// out of scope.
+//
+// This is more useful in test code than production code,
+// since it doesn't allocate from an arena, etc.
+template<DataType type>
+class ScopedColumnBlock : public ColumnBlock {
+ public:
+ typedef typename TypeTraits<type>::cpp_type cpp_type;
+
+ explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
+ : ColumnBlock(GetTypeInfo(type),
+ allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
+ new cpp_type[n_rows],
+ n_rows,
+ new RowBlockMemory()),
+ non_null_bitmap_(non_null_bitmap()),
+ data_(reinterpret_cast<cpp_type *>(data())),
+ memory_(memory()) {
+ if (allow_nulls) {
+ // All rows begin null.
+ BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
+ }
+ }
+
+ const cpp_type &operator[](size_t idx) const {
+ return data_[idx];
+ }
+
+ cpp_type &operator[](size_t idx) {
+ return data_[idx];
+ }
+
+ private:
+ std::unique_ptr<uint8_t[]> non_null_bitmap_;
+ std::unique_ptr<cpp_type[]> data_;
+ std::unique_ptr<RowBlockMemory> memory_;
+
+};
+
+} // namespace kudu
diff --git a/src/kudu/common/columnblock-test.cc b/src/kudu/common/columnblock-test.cc
index 5bed5d4..37a49f5 100644
--- a/src/kudu/common/columnblock-test.cc
+++ b/src/kudu/common/columnblock-test.cc
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/common/columnblock.h"
-
#include <string>
#include <gtest/gtest.h>
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/types.h"
diff --git a/src/kudu/common/columnblock.cc b/src/kudu/common/columnblock.cc
index 63eda18..2f5bf8d 100644
--- a/src/kudu/common/columnblock.cc
+++ b/src/kudu/common/columnblock.cc
@@ -19,8 +19,10 @@
#include <cstring>
+#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/util/memory/arena.h"
namespace kudu {
@@ -51,8 +53,8 @@ Status ColumnBlock::CopyTo(const SelectionVector& sel_vec,
BitmapCopy(dst->non_null_bitmap_, dst_cell_off,
non_null_bitmap_, src_cell_off,
num_cells);
+ }
}
-}
return Status::OK();
}
diff --git a/src/kudu/common/columnblock.h b/src/kudu/common/columnblock.h
index a91074c..a5c0461 100644
--- a/src/kudu/common/columnblock.h
+++ b/src/kudu/common/columnblock.h
@@ -18,23 +18,22 @@
#include <cstddef>
#include <cstdint>
-#include <memory>
#include <ostream>
#include <string>
#include <glog/logging.h>
-#include "kudu/common/common.pb.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/types.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/util/bitmap.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/memory/overwrite.h"
#include "kudu/util/status.h"
namespace kudu {
+class Arena;
class ColumnBlockCell;
class SelectionVector;
@@ -47,15 +46,15 @@ class ColumnBlock {
typedef ColumnBlockCell Cell;
ColumnBlock(const TypeInfo* type,
- uint8_t *non_null_bitmap,
- void *data,
+ uint8_t* non_null_bitmap,
+ void* data,
size_t nrows,
- Arena *arena)
- : type_(type),
- non_null_bitmap_(non_null_bitmap),
- data_(reinterpret_cast<uint8_t *>(data)),
- nrows_(nrows),
- arena_(arena) {
+ RowBlockMemory* memory)
+ : type_(type),
+ non_null_bitmap_(non_null_bitmap),
+ data_(reinterpret_cast<uint8_t*>(data)),
+ nrows_(nrows),
+ memory_(memory) {
DCHECK(data_) << "null data";
}
@@ -106,12 +105,13 @@ class ColumnBlock {
return !BitmapTest(non_null_bitmap_, idx);
}
- const size_t stride() const { return type_->size(); }
- const uint8_t * data() const { return data_; }
- uint8_t *data() { return data_; }
- const size_t nrows() const { return nrows_; }
+ size_t stride() const { return type_->size(); }
+ const uint8_t* data() const { return data_; }
+ uint8_t* data() { return data_; }
+ size_t nrows() const { return nrows_; }
- Arena *arena() { return arena_; }
+ RowBlockMemory* memory() { return memory_; }
+ Arena* arena() { return &memory_->arena; }
const TypeInfo* type_info() const {
return type_;
@@ -164,7 +164,7 @@ class ColumnBlock {
uint8_t *data_;
size_t nrows_;
- Arena *arena_;
+ RowBlockMemory* memory_;
};
inline bool operator==(const ColumnBlock& a, const ColumnBlock& b) {
@@ -261,7 +261,9 @@ class ColumnDataView {
return column_block_->cell_ptr(row_offset_);
}
- Arena *arena() { return column_block_->arena(); }
+ RowBlockMemory* memory() { return column_block_->memory(); }
+
+ Arena* arena() { return &memory()->arena; }
size_t nrows() const {
return column_block_->nrows() - row_offset_;
@@ -280,45 +282,4 @@ class ColumnDataView {
size_t row_offset_;
};
-// Utility class which allocates temporary storage for a
-// dense block of column data, freeing it when it goes
-// out of scope.
-//
-// This is more useful in test code than production code,
-// since it doesn't allocate from an arena, etc.
-template<DataType type>
-class ScopedColumnBlock : public ColumnBlock {
- public:
- typedef typename TypeTraits<type>::cpp_type cpp_type;
-
- explicit ScopedColumnBlock(size_t n_rows, bool allow_nulls = true)
- : ColumnBlock(GetTypeInfo(type),
- allow_nulls ? new uint8_t[BitmapSize(n_rows)] : nullptr,
- new cpp_type[n_rows],
- n_rows,
- new Arena(1024)),
- non_null_bitmap_(non_null_bitmap()),
- data_(reinterpret_cast<cpp_type *>(data())),
- arena_(arena()) {
- if (allow_nulls) {
- // All rows begin null.
- BitmapChangeBits(non_null_bitmap(), /*offset=*/ 0, n_rows, /*value=*/ false);
- }
- }
-
- const cpp_type &operator[](size_t idx) const {
- return data_[idx];
- }
-
- cpp_type &operator[](size_t idx) {
- return data_[idx];
- }
-
- private:
- std::unique_ptr<uint8_t[]> non_null_bitmap_;
- std::unique_ptr<cpp_type[]> data_;
- std::unique_ptr<Arena> arena_;
-
-};
-
} // namespace kudu
diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc
index 6713a09..08b39ea 100644
--- a/src/kudu/common/generic_iterators-test.cc
+++ b/src/kudu/common/generic_iterators-test.cc
@@ -45,6 +45,7 @@
#include "kudu/common/key_encoder.h"
#include "kudu/common/predicate_effectiveness.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
@@ -54,7 +55,6 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/hash.pb.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
@@ -258,7 +258,8 @@ TEST(TestMergeIterator, TestNotConsumedCleanup) {
ASSERT_OK(merger->Init(nullptr));
ASSERT_TRUE(merger->HasNext());
- RowBlock dst(&kIntSchema, 1, nullptr);
+ RowBlockMemory mem;
+ RowBlock dst(&kIntSchema, 1, &mem);
ASSERT_OK(merger->NextBlock(&dst));
ASSERT_EQ(1, dst.nrows());
ASSERT_TRUE(merger->HasNext());
@@ -435,7 +436,8 @@ void TestMerge(const Schema& schema,
// The RowBlock is sized to a power of 2 to improve BitmapCopy performance
// when copying another RowBlock into it.
- RowBlock dst(&schema, 128, nullptr);
+ RowBlockMemory mem;
+ RowBlock dst(&schema, 128, &mem);
size_t total_idx = 0;
auto expected_iter = expected.cbegin();
while (merger->HasNext()) {
@@ -524,8 +526,8 @@ TEST(TestMaterializingIterator, TestMaterializingPredicatePushdown) {
ASSERT_OK(materializing->Init(&spec));
ASSERT_EQ(0, spec.predicates().size()) << "Iterator should have pushed down predicate";
- Arena arena(1024);
- RowBlock dst(&kIntSchema, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock dst(&kIntSchema, 100, &mem);
ASSERT_OK(materializing->NextBlock(&dst));
ASSERT_EQ(dst.nrows(), 100);
@@ -572,8 +574,8 @@ TEST(TestPredicateEvaluatingIterator, TestPredicateEvaluation) {
ASSERT_EQ(1, GetIteratorPredicatesForTests(outer_iter).size())
<< "Predicate should be evaluated by the outer iterator";
- Arena arena(1024);
- RowBlock dst(&kIntSchema, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock dst(&kIntSchema, 100, &mem);
ASSERT_OK(outer_iter->NextBlock(&dst));
ASSERT_EQ(dst.nrows(), 100);
@@ -726,11 +728,11 @@ class PredicateEffectivenessTest :
ASSERT_TRUE(GetIteratorPredicateEffectivenessCtxForTests(iter)[0].enabled)
<< "Predicate must be enabled to begin with";
- Arena arena(1024);
+ RowBlockMemory mem;
FLAGS_predicate_effectivess_num_skip_blocks = 4;
if (all_values) {
for (int i = 0; i < kNumRows / kBatchSize; i++) {
- RowBlock dst(&kIntSchema, kBatchSize, &arena);
+ RowBlock dst(&kIntSchema, kBatchSize, &mem);
ASSERT_OK(iter->NextBlock(&dst));
ASSERT_EQ(kBatchSize, dst.nrows());
ASSERT_EQ(kBatchSize, dst.selection_vector()->CountSelected());
@@ -741,7 +743,7 @@ class PredicateEffectivenessTest :
}
} else {
for (int i = 0; i < kNumRows / kBatchSize; i++) {
- RowBlock dst(&kIntSchema, kBatchSize, &arena);
+ RowBlock dst(&kIntSchema, kBatchSize, &mem);
ASSERT_OK(iter->NextBlock(&dst));
ASSERT_EQ(kBatchSize, dst.nrows());
// For subset case, the predicate should never be disabled.
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index d1301aa..390740a 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -49,6 +49,7 @@
#include "kudu/common/predicate_effectiveness.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/casts.h"
@@ -115,7 +116,7 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
public:
explicit MergeIterState(IterWithBounds iwb)
: iwb_(std::move(iwb)),
- arena_(1024),
+ memory_(1024),
next_row_idx_(0)
{}
@@ -148,18 +149,18 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
// exist. If not, we have to pull a block immediately: after Init() is
// finished it must be safe to call next_row() and last_row().
//
- // Decoded bound allocations are done against 'decoded_bounds_arena'.
- Status Init(Arena* decoded_bounds_arena) {
+ // Decoded bound allocations are done against the arena in 'decoded_bounds_memory'.
+ Status Init(RowBlockMemory* decoded_bounds_memory) {
DCHECK(!read_block_);
if (iwb_.encoded_bounds) {
- decoded_bounds_.emplace(&schema(), decoded_bounds_arena);
+ decoded_bounds_.emplace(&schema(), decoded_bounds_memory);
decoded_bounds_->lower = decoded_bounds_->block.row(0);
decoded_bounds_->upper = decoded_bounds_->block.row(1);
RETURN_NOT_OK(schema().DecodeRowKey(
- iwb_.encoded_bounds->first, &decoded_bounds_->lower, decoded_bounds_arena));
+ iwb_.encoded_bounds->first, &decoded_bounds_->lower, &decoded_bounds_memory->arena));
RETURN_NOT_OK(schema().DecodeRowKey(
- iwb_.encoded_bounds->second, &decoded_bounds_->upper, decoded_bounds_arena));
+ iwb_.encoded_bounds->second, &decoded_bounds_->upper, &decoded_bounds_memory->arena));
} else {
RETURN_NOT_OK(PullNextBlock());
}
@@ -223,14 +224,14 @@ class MergeIterState : public boost::intrusive::list_base_hook<> {
IterWithBounds iwb_;
// Allocates memory for read_block_.
- Arena arena_;
+ RowBlockMemory memory_;
// Optional rowset bounds, decoded during Init().
struct DecodedBounds {
// 'block' must be constructed immediately; the bounds themselves can be
// initialized later.
- DecodedBounds(const Schema* schema, Arena* arena)
- : block(schema, /*nrows=*/2, arena) {}
+ DecodedBounds(const Schema* schema, RowBlockMemory* mem)
+ : block(schema, /*nrows=*/2, mem) {}
RowBlock block;
RowBlockRow lower;
@@ -274,7 +275,7 @@ Status MergeIterState::Advance(size_t num_rows, bool* pulled_new_block) {
// We either exhausted the block outright, or all subsequent rows were
// deselected. Either way, we need to pull the next block.
next_row_idx_ = read_block_->nrows();
- arena_.Reset();
+ memory_.Reset();
RETURN_NOT_OK(PullNextBlock());
*pulled_new_block = true;
return Status::OK();
@@ -285,7 +286,7 @@ Status MergeIterState::PullNextBlock() {
<< "should not pull next block until current block is exhausted";
if (!read_block_) {
- read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &arena_));
+ read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &memory_));
}
while (iwb_.iter->HasNext()) {
RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get()));
@@ -540,7 +541,7 @@ class MergeIterator : public RowwiseIterator {
// Each MergeIterState has an arena for buffered row data, but it is reset
// every time a new block is pulled. This single arena ensures that a
// MergeIterState's decoded bounds remain allocated for its lifetime.
- Arena decoded_bounds_arena_;
+ RowBlockMemory decoded_bounds_memory_;
// Min-heap that orders rows by their keys. A call to top() will yield the row
// with the smallest key.
@@ -595,7 +596,7 @@ MergeIterator::MergeIterator(MergeIteratorOptions opts,
initted_(false),
orig_iters_(std::move(iters)),
num_orig_iters_(orig_iters_.size()),
- decoded_bounds_arena_(1024) {
+ decoded_bounds_memory_(1024) {
CHECK_GT(orig_iters_.size(), 0);
}
@@ -664,7 +665,7 @@ Status MergeIterator::InitSubIterators(ScanSpec *spec) {
ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr;
RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy));
unique_ptr<MergeIterState> state(new MergeIterState(std::move(i)));
- RETURN_NOT_OK(state->Init(&decoded_bounds_arena_));
+ RETURN_NOT_OK(state->Init(&decoded_bounds_memory_));
states_.push_back(*state.release());
}
orig_iters_.clear();
diff --git a/src/kudu/common/rowblock.cc b/src/kudu/common/rowblock.cc
index 85bd4ca..059fbf1 100644
--- a/src/kudu/common/rowblock.cc
+++ b/src/kudu/common/rowblock.cc
@@ -168,13 +168,13 @@ std::vector<uint16_t> SelectedRows::CreateRowIndexes() {
//////////////////////////////
RowBlock::RowBlock(const Schema* schema,
size_t nrows,
- Arena *arena)
+ RowBlockMemory* memory)
: schema_(schema),
columns_data_(schema->num_columns()),
column_non_null_bitmaps_(schema->num_columns()),
row_capacity_(nrows),
nrows_(nrows),
- arena_(arena),
+ memory_(memory),
sel_vec_(nrows) {
CHECK_GT(row_capacity_, 0);
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 0db69d2..5dd5827 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -26,6 +26,7 @@
#include <glog/logging.h>
#include "kudu/common/columnblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/gutil/macros.h"
@@ -294,7 +295,6 @@ class SelectedRows {
std::vector<uint16_t> indexes_;
};
-
// A block of decoded rows.
// Wrapper around a buffer, which keeps the buffer's size, associated arena,
// and schema. Provides convenience accessors for indexing by row, column, etc.
@@ -313,9 +313,7 @@ class RowBlock {
// Constructs a new RowBlock.
//
// The 'schema' and 'arena' objects must outlive this RowBlock.
- RowBlock(const Schema* schema,
- size_t nrows,
- Arena* arena);
+ RowBlock(const Schema* schema, size_t nrows, RowBlockMemory* memory);
~RowBlock();
// Resize the block to the given number of rows.
@@ -331,7 +329,7 @@ class RowBlock {
RowBlockRow row(size_t idx) const;
const Schema* schema() const { return schema_; }
- Arena* arena() const { return arena_; }
+ Arena* arena() const { return &memory_->arena; }
ColumnBlock column_block(size_t col_idx) const {
return column_block(col_idx, nrows_);
@@ -344,7 +342,7 @@ class RowBlock {
uint8_t* col_data = columns_data_[col_idx];
uint8_t* nulls_bitmap = column_non_null_bitmaps_[col_idx];
- return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, arena_);
+ return ColumnBlock(col_schema.type_info(), nulls_bitmap, col_data, nrows, memory_);
}
// Return the base pointer for the given column's data.
@@ -447,7 +445,7 @@ class RowBlock {
// nrows_ <= row_capacity_
size_t nrows_;
- Arena* arena_;
+ RowBlockMemory* memory_;
// The bitmap indicating which rows are valid in this block.
// Deleted rows or rows which have failed to pass predicates will be zeroed
diff --git a/src/kudu/common/rowblock_memory.h b/src/kudu/common/rowblock_memory.h
new file mode 100644
index 0000000..9117ebb
--- /dev/null
+++ b/src/kudu/common/rowblock_memory.h
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/util/memory/arena.h"
+
+namespace kudu {
+
+// Handles the memory allocated alongside a RowBlock for variable-length
+// cells.
+//
+// When scanning rows into a RowBlock, the rows may contain variable-length
+// data (eg BINARY columns). In this case, the data cannot be inlined directly
+// into the columnar data arrays that are part of the RowBlock and instead need
+// to be allocated out of a separate Arena. This class wraps that Arena.
+struct RowBlockMemory {
+ Arena arena;
+
+ explicit RowBlockMemory(int arena_size = 32 * 1024) : arena(arena_size) {}
+ void Reset() { arena.Reset(); }
+};
+
+} // namespace kudu
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index fb08521..ba391a8 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -35,6 +35,7 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.pb.h"
@@ -261,8 +262,8 @@ TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) {
// Create a block of rows and ensure that it can be converted to and from protobuf.
TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
- Arena arena(1024);
- RowBlock block(&schema_, 30, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema_, 30, &mem);
FillRowBlockWithTestRows(&block);
// Convert to PB.
@@ -299,10 +300,10 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
// Generate several blocks of random data.
static constexpr int kNumBlocks = 3;
static constexpr int kBatchSizeBytes = 8192 * 1024;
- Arena arena(1024);
+ RowBlockMemory mem(1024);
std::list<RowBlock> blocks;
for (int i = 0; i < kNumBlocks; i++) {
- blocks.emplace_back(&schema_, 30, &arena);
+ blocks.emplace_back(&schema_, 30, &mem);
FillRowBlockWithTestRows(&blocks.back());
}
@@ -363,7 +364,7 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
// converted to and from protobuf.
TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
int kNumRows = 10;
- Arena arena(1024);
+ RowBlockMemory mem(1024);
// Create a schema with multiple UNIXTIME_MICROS columns in different
// positions.
Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS),
@@ -371,7 +372,7 @@ TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
ColumnSchema("col2", UNIXTIME_MICROS),
ColumnSchema("col3", INT32, true /* nullable */),
ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1);
- RowBlock block(&tablet_schema, kNumRows, &arena);
+ RowBlock block(&tablet_schema, kNumRows, &mem);
block.selection_vector()->SetAllTrue();
for (int i = 0; i < block.nrows(); i++) {
@@ -573,8 +574,8 @@ class WireProtocolBenchmark :
double RunBenchmark(const BenchmarkColumnsSpec& spec,
double select_rate) {
ResetBenchmarkSchema(spec);
- Arena arena(1024);
- RowBlock block(&benchmark_schema_, 1000, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&benchmark_schema_, 1000, &mem);
// Regardless of the config, use a constant number of selected cells for the test by
// looping the conversion an appropriate number of times.
const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
@@ -687,8 +688,8 @@ TEST_F(WireProtocolTest, TestInvalidRowBlock) {
// projection (a COUNT(*) query).
TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
Schema empty(std::vector<ColumnSchema>(), 0);
- Arena arena(1024);
- RowBlock block(&empty, 1000, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&empty, 1000, &mem);
block.selection_vector()->SetAllTrue();
// Unselect 100 rows
for (int i = 0; i < 100; i++) {
@@ -792,7 +793,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
ColumnSchema col1("col1", INT32);
vector<ColumnSchema> cols = { col1 };
Schema schema(cols, 1);
- Arena arena(1024);
+ RowBlockMemory mem(1024);
boost::optional<ColumnPredicate> predicate;
{ // col1 IN (5, 6, 10)
@@ -805,7 +806,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(cp, &pb));
- ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+ ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InList);
ASSERT_EQ(3, predicate->raw_values().size());
}
@@ -819,7 +820,7 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
*pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
*pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
- ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+ ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(PredicateType::Equality, predicate->predicate_type());
}
@@ -828,9 +829,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
pb.set_column("col1");
pb.mutable_in_list();
- Arena arena(1024);
+ RowBlockMemory mem(1024);
boost::optional<ColumnPredicate> predicate;
- ASSERT_OK(ColumnPredicateFromPB(schema, &arena, pb, &predicate));
+ ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(PredicateType::None, predicate->predicate_type());
}
@@ -840,9 +841,9 @@ TEST_F(WireProtocolTest, TestColumnPredicateInList) {
pb.mutable_in_list();
*pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1);
- Arena arena(1024);
+ RowBlockMemory mem(1024);
boost::optional<ColumnPredicate> predicate;
- ASSERT_TRUE(ColumnPredicateFromPB(schema, &arena, pb, &predicate).IsInvalidArgument());
+ ASSERT_TRUE(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate).IsInvalidArgument());
}
}
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 32bf71e..8e395df 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -572,9 +572,10 @@ inline Status LinkedListTester::VerifyLinkedListLocal(
"Cannot create new row iterator");
RETURN_NOT_OK_PREPEND(iter->Init(nullptr), "Cannot initialize row iterator");
- Arena arena(1024);
- RowBlock block(&projection, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&projection, 100, &mem);
while (iter->HasNext()) {
+ mem.Reset();
RETURN_NOT_OK(iter->NextBlock(&block));
for (int i = 0; i < block.nrows(); i++) {
int64_t key;
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index f19e5cf..b7f078a 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -40,6 +40,7 @@
#include "kudu/common/partition.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
@@ -77,7 +78,6 @@
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -692,8 +692,8 @@ Status SysCatalogTable::ProcessRows(
RETURN_NOT_OK(tablet_replica_->tablet()->NewRowIterator(schema_, &iter));
RETURN_NOT_OK(iter->Init(&spec));
- Arena arena(32 * 1024);
- RowBlock block(&iter->schema(), 512, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&iter->schema(), 512, &mem);
while (iter->HasNext()) {
RETURN_NOT_OK(iter->NextBlock(&block));
const size_t nrows = block.nrows();
diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc
index 4b78835..d3f66c3 100644
--- a/src/kudu/tablet/cfile_set-test.cc
+++ b/src/kudu/tablet/cfile_set-test.cc
@@ -39,6 +39,7 @@
#include "kudu/common/iterator_stats.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
@@ -192,9 +193,10 @@ class TestCFileSet : public KuduRowSetTest {
ASSERT_OK(iter->Init(&spec));
// Check that the range was respected on all the results.
- Arena arena(1024);
- RowBlock block(&schema_, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema_, 100, &mem);
while (iter->HasNext()) {
+ mem.Reset();
ASSERT_OK_FAST(iter->NextBlock(&block));
for (size_t i = 0; i < block.nrows(); i++) {
if (block.selection_vector()->IsRowSelected(i)) {
@@ -226,8 +228,8 @@ class TestCFileSet : public KuduRowSetTest {
}
ASSERT_OK(iter->Init(&spec));
// Check that the range was respected on all the results.
- Arena arena(1024);
- RowBlock block(&schema_, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema_, 100, &mem);
while (iter->HasNext()) {
ASSERT_OK_FAST(iter->NextBlock(&block));
for (size_t i = 0; i < block.nrows(); i++) {
@@ -288,11 +290,11 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) {
unique_ptr<CFileSet::Iterator> iter(fileset->NewIterator(&schema_, nullptr));
ASSERT_OK(iter->Init(nullptr));
- Arena arena(4096);
- RowBlock block(&schema_, 100, &arena);
+ RowBlockMemory mem(4096);
+ RowBlock block(&schema_, 100, &mem);
rowid_t row_idx = 0;
while (iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
size_t n = block.nrows();
ASSERT_OK_FAST(iter->PrepareBatch(&n));
diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index 0aff19d..07d4597 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -38,6 +38,7 @@
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
@@ -781,7 +782,8 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
}
- RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &arena_);
+ RowBlockMemory mem;
+ RowBlock block(&schema_, kBaseNumRowSets * kNumRowsPerRowSet, &mem);
// Go through the expected compaction input rows, flip the last undo into a redo and
// build the base. This will give us the final version that we'll expect the result
// of the real compaction to match.
@@ -791,13 +793,13 @@ TEST_F(TestCompaction, TestDuplicatedRowsRandomCompaction) {
row->undo_head = reinsert->next();
row->row = block.row(i);
BuildRow(i, i);
- CopyRow(row_builder_.row(), &row->row, &arena_);
+ CopyRow(row_builder_.row(), &row->row, &mem.arena);
RowChangeListDecoder redo_decoder(reinsert->changelist());
CHECK_OK(redo_decoder.Init());
faststring buf;
RowChangeListEncoder dummy(&buf);
dummy.SetToUpdate();
- redo_decoder.MutateRowAndCaptureChanges(&row->row, &arena_, &dummy);
+ redo_decoder.MutateRowAndCaptureChanges(&row->row, &mem.arena, &dummy);
AddExpectedDelete(&row->redo_head, reinsert->timestamp());
}
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index fbd28e8..23eba03 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -33,6 +33,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
@@ -202,8 +203,8 @@ class DiskRowSetCompactionInput : public CompactionInput {
: base_iter_(std::move(base_iter)),
redo_delta_iter_(std::move(redo_delta_iter)),
undo_delta_iter_(std::move(undo_delta_iter)),
- arena_(32 * 1024),
- block_(&base_iter_->schema(), kRowsPerBlock, &arena_),
+ mem_(32 * 1024),
+ block_(&base_iter_->schema(), kRowsPerBlock, &mem_),
redo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)),
undo_mutation_block_(kRowsPerBlock, static_cast<Mutation *>(nullptr)) {}
@@ -248,7 +249,7 @@ class DiskRowSetCompactionInput : public CompactionInput {
return Status::OK();
}
- Arena* PreparedBlockArena() override { return &arena_; }
+ Arena* PreparedBlockArena() override { return &mem_.arena; }
Status FinishBlock() override {
return Status::OK();
@@ -264,7 +265,7 @@ class DiskRowSetCompactionInput : public CompactionInput {
unique_ptr<DeltaIterator> redo_delta_iter_;
unique_ptr<DeltaIterator> undo_delta_iter_;
- Arena arena_;
+ RowBlockMemory mem_;
// The current block of data which has come from the input iterator
RowBlock block_;
@@ -690,7 +691,7 @@ class MergeCompactionInput : public CompactionInput {
num_dup_rows_++;
if (row_idx == 0) {
duplicated_rows_.push_back(std::unique_ptr<RowBlock>(
- new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<Arena*>(nullptr))));
+ new RowBlock(schema_, kDuplicatedRowsPerBlock, static_cast<RowBlockMemory*>(nullptr))));
}
return duplicated_rows_.back()->row(row_idx);
}
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index b97af61..66516d4 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -31,6 +31,7 @@
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/scan_spec.h"
#include "kudu/fs/block_manager.h"
@@ -128,8 +129,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
RETURN_NOT_OK(delta_iter_->Init(&spec));
RETURN_NOT_OK(delta_iter_->SeekToOrdinal(0));
- Arena arena(32 * 1024);
- RowBlock block(&partial_schema_, kRowsPerBlock, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&partial_schema_, kRowsPerBlock, &mem);
DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")";
unique_ptr<DeltaStats> redo_stats(new DeltaStats);
@@ -140,7 +141,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
while (old_base_data_rwise->HasNext()) {
// 1) Get the next batch of base data for the columns we're compacting.
- arena.Reset();
+ mem.Reset();
RETURN_NOT_OK(old_base_data_rwise->NextBlock(&block));
size_t n = block.nrows();
@@ -177,12 +178,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
// NOTE: This is presently ignored.
bool is_garbage_collected;
- RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(snap,
- *input_row,
- &new_undos_head,
- &new_redos_head,
- &arena,
- &dst_row));
+ RETURN_NOT_OK(ApplyMutationsAndGenerateUndos(
+ snap, *input_row, &new_undos_head, &new_redos_head, &mem.arena, &dst_row));
RemoveAncientUndos(history_gc_opts_,
&new_undos_head,
@@ -210,9 +207,9 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
// 5) Remove the columns that we've done our major REDO delta compaction on
// from this delta flush, except keep all the delete and reinsert
// mutations.
- arena.Reset();
+ mem.Reset();
vector<DeltaKeyAndUpdate> out;
- RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &arena));
+ RETURN_NOT_OK(delta_iter_->FilterColumnIdsAndCollectDeltas(column_ids_, &out, &mem.arena));
// We only create a new redo delta file if we need to.
if (!out.empty() && !new_redo_delta_writer_) {
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 5b0ee12..b58b2fa 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -31,10 +31,12 @@
#include <gtest/gtest.h>
#include "kudu/cfile/cfile_util.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
@@ -83,8 +85,7 @@ using fs::WritableBlock;
class TestDeltaFile : public KuduTest {
public:
TestDeltaFile() :
- schema_(CreateSchema()),
- arena_(1024) {
+ schema_(CreateSchema()) {
}
public:
@@ -175,7 +176,8 @@ class TestDeltaFile : public KuduTest {
ASSERT_OK(s);
ASSERT_OK(it->Init(nullptr));
- RowBlock block(&schema_, 100, &arena_);
+ RowBlockMemory mem;
+ RowBlock block(&schema_, 100, &mem);
// Iterate through the faked table, starting with batches that
// come before all of the updates, and extending a bit further
@@ -185,7 +187,7 @@ class TestDeltaFile : public KuduTest {
int start_row = 0;
while (start_row < FLAGS_last_row_to_update + 10000) {
block.ZeroMemory();
- arena_.Reset();
+ mem.Reset();
ASSERT_OK_FAST(it->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
SelectionVector sv(block.nrows());
@@ -218,7 +220,6 @@ class TestDeltaFile : public KuduTest {
protected:
unique_ptr<FsManager> fs_manager_;
Schema schema_;
- Arena arena_;
BlockId test_block_;
};
@@ -311,13 +312,15 @@ TEST_F(TestDeltaFile, TestCollectMutations) {
vector<Mutation *> mutations;
mutations.resize(100);
+ Arena arena(1024);
+
int start_row = 0;
while (start_row < FLAGS_last_row_to_update + 10000) {
+ arena.Reset();
std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));
- arena_.Reset();
ASSERT_OK_FAST(it->PrepareBatch(mutations.size(), DeltaIterator::PREPARE_FOR_COLLECT));
- ASSERT_OK(it->CollectMutations(&mutations, &arena_));
+ ASSERT_OK(it->CollectMutations(&mutations, &arena));
for (int i = 0; i < mutations.size(); i++) {
Mutation *mut_head = mutations[i];
diff --git a/src/kudu/tablet/diskrowset-test-base.h b/src/kudu/tablet/diskrowset-test-base.h
index 70215b9..9eef818 100644
--- a/src/kudu/tablet/diskrowset-test-base.h
+++ b/src/kudu/tablet/diskrowset-test-base.h
@@ -221,13 +221,13 @@ class TestRowSet : public KuduRowSetTest {
std::unique_ptr<RowwiseIterator> row_iter;
CHECK_OK(rs.NewRowIterator(opts, &row_iter));
CHECK_OK(row_iter->Init(nullptr));
- Arena arena(1024);
+ RowBlockMemory mem(1024);
int batch_size = 10000;
- RowBlock dst(&proj_val, batch_size, &arena);
+ RowBlock dst(&proj_val, batch_size, &mem);
int i = 0;
while (row_iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
CHECK_OK(row_iter->NextBlock(&dst));
VerifyUpdatedBlock(proj_val.ExtractColumnFromRow<UINT32>(dst.row(0), 0),
i, dst.nrows(), updated);
@@ -285,13 +285,13 @@ class TestRowSet : public KuduRowSetTest {
CHECK_OK(row_iter->Init(nullptr));
int batch_size = 1000;
- Arena arena(1024);
- RowBlock dst(&schema, batch_size, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock dst(&schema, batch_size, &mem);
int i = 0;
int log_interval = expected_rows/20 / batch_size;
while (row_iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
CHECK_OK(row_iter->NextBlock(&dst));
i += dst.nrows();
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index c81d431..62afeb3 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -36,6 +36,7 @@
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/consensus/log_anchor_registry.h"
@@ -212,10 +213,11 @@ class TestMemRowSet : public KuduTest {
unique_ptr<MemRowSet::Iterator> iter(mrs->NewIterator(opts));
CHECK_OK(iter->Init(nullptr));
- Arena arena(1024);
- RowBlock block(&schema_, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema_, 100, &mem);
int fetched = 0;
while (iter->HasNext()) {
+ mem.Reset();
CHECK_OK(iter->NextBlock(&block));
fetched += block.selection_vector()->CountSelected();
}
diff --git a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
index 70201e5..141e396 100644
--- a/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
+++ b/src/kudu/tablet/mt-rowset_delta_compaction-test.cc
@@ -28,13 +28,13 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/tablet/diskrowset-test-base.h"
#include "kudu/tablet/diskrowset.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
@@ -106,8 +106,8 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
}
void ReadVerify(DiskRowSet *rs) {
- Arena arena(1024);
- RowBlock dst(&schema_, 1000, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock dst(&schema_, 1000, &mem);
RowIteratorOptions opts;
opts.projection = &schema_;
unique_ptr<RowwiseIterator> iter;
@@ -115,6 +115,7 @@ class TestMultiThreadedRowSetDeltaCompaction : public TestRowSet {
uint32_t expected = NoBarrier_Load(&update_counter_);
ASSERT_OK(iter->Init(nullptr));
while (iter->HasNext()) {
+ mem.Reset();
ASSERT_OK_FAST(iter->NextBlock(&dst));
size_t n = dst.nrows();
ASSERT_GT(n, 0);
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 8cf354b..f85d595 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -34,6 +34,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/basictypes.h"
@@ -46,7 +47,6 @@
#include "kudu/tablet/tablet.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
@@ -148,8 +148,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
- Arena tmp_arena(1024);
- RowBlock block(&schema_, 1, &tmp_arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema_, 1, &mem);
faststring update_buf;
uint64_t updates_since_last_report = 0;
@@ -164,7 +164,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
CHECK_OK(iter->Init(nullptr));
while (iter->HasNext() && running_insert_count_.count() > 0) {
- tmp_arena.Reset();
+ mem.Reset();
CHECK_OK(iter->NextBlock(&block));
CHECK_EQ(block.nrows(), 1);
@@ -218,8 +218,8 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
// This is meant to test that outstanding iterators don't end up
// trying to reference already-freed memrowset memory.
void SlowReaderThread(int /*tid*/) {
- Arena arena(32*1024);
- RowBlock block(&schema_, 1, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&schema_, 1, &mem);
uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads)
/ FLAGS_num_insert_threads;
@@ -232,6 +232,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
CHECK_OK(iter->Init(nullptr));
for (int i = 0; i < max_iters && iter->HasNext(); i++) {
+ mem.Reset();
CHECK_OK(iter->NextBlock(&block));
if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
@@ -251,10 +252,10 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
}
uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
- Arena arena(1024); // unused, just scanning ints
+ RowBlockMemory mem(1024); // unused, just scanning ints
static const int kBufInts = 1024*1024 / 8;
- RowBlock block(&valcol_projection_, kBufInts, &arena);
+ RowBlock block(&valcol_projection_, kBufInts, &mem);
ColumnBlock column = block.column_block(0);
uint64_t count_since_report = 0;
@@ -266,7 +267,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
CHECK_OK(iter->Init(nullptr));
while (iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
CHECK_OK(iter->NextBlock(&block));
for (size_t j = 0; j < block.nrows(); j++) {
diff --git a/src/kudu/tablet/tablet-decoder-eval-test.cc b/src/kudu/tablet/tablet-decoder-eval-test.cc
index bf5fd0f..c811912 100644
--- a/src/kudu/tablet/tablet-decoder-eval-test.cc
+++ b/src/kudu/tablet/tablet-decoder-eval-test.cc
@@ -34,6 +34,7 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/stringprintf.h"
@@ -255,12 +256,13 @@ public:
ASSERT_OK(iter->Init(&spec));
ASSERT_TRUE(spec.predicates().empty()) << "Should have accepted all predicates";
- Arena ret_arena(1024);
+ RowBlockMemory mem(1024);
size_t expected_count = ExpectedCount(nrows, cardinality, lower, upper);
Schema schema = iter->schema();
- RowBlock block(&schema, 100, &ret_arena);
+ RowBlock block(&schema, 100, &mem);
int fetched = 0;
- string column_str_a, column_str_b;
+ string column_str_a;
+ string column_str_b;
while (iter->HasNext()) {
ASSERT_OK(iter->NextBlock(&block));
for (size_t i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tablet/tablet-test-base.h b/src/kudu/tablet/tablet-test-base.h
index b85640a..9f9b406 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -311,8 +311,7 @@ class TabletTestBase : public KuduTabletTest {
TabletHarness::Options::ClockType::LOGICAL_CLOCK) :
KuduTabletTest(TESTSETUP::CreateSchema(), clock_type),
setup_(),
- max_rows_(setup_.GetMaxRows()),
- arena_(1024)
+ max_rows_(setup_.GetMaxRows())
{}
// Inserts "count" rows.
@@ -452,8 +451,8 @@ class TabletTestBase : public KuduTabletTest {
ASSERT_OK(iter->Init(nullptr));
int batch_size = std::max<size_t>(1, std::min<size_t>(expected_row_count / 10,
4L * 1024 * 1024 / schema_.byte_size()));
- Arena arena(32*1024);
- RowBlock block(&schema_, batch_size, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&schema_, batch_size, &mem);
bool check_for_dups = true;
if (expected_row_count > INT_MAX) {
@@ -469,6 +468,7 @@ class TabletTestBase : public KuduTabletTest {
uint64_t actual_row_count = 0;
while (iter->HasNext()) {
+ mem.Reset();
ASSERT_OK_FAST(iter->NextBlock(&block));
RowBlockRow rb_row = block.row(0);
@@ -537,8 +537,6 @@ class TabletTestBase : public KuduTabletTest {
TESTSETUP setup_;
const uint64_t max_rows_;
-
- Arena arena_;
};
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index bf92857..1ca5a1a 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -38,6 +38,7 @@
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/columnblock.h"
+#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/row.h"
@@ -199,8 +200,8 @@ class KuduRowSetTest : public KuduTabletTest {
static inline Status SilentIterateToStringList(RowwiseIterator* iter,
int* fetched) {
const Schema& schema = iter->schema();
- Arena arena(1024);
- RowBlock block(&schema, 100, &arena);
+ RowBlockMemory memory(1024);
+ RowBlock block(&schema, 100, &memory);
*fetched = 0;
while (iter->HasNext()) {
RETURN_NOT_OK(iter->NextBlock(&block));
@@ -218,8 +219,8 @@ static inline Status IterateToStringList(RowwiseIterator* iter,
int limit = INT_MAX) {
out->clear();
Schema schema = iter->schema();
- Arena arena(1024);
- RowBlock block(&schema, 100, &arena);
+ RowBlockMemory memory(1024);
+ RowBlock block(&schema, 100, &memory);
int fetched = 0;
while (iter->HasNext() && fetched < limit) {
RETURN_NOT_OK(iter->NextBlock(&block));
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index aaea8ef..f0e9c92 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -39,6 +39,7 @@
#include "kudu/common/key_range.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.pb.h"
@@ -507,7 +508,8 @@ TYPED_TEST(TestTablet, TestRowIteratorSimple) {
ASSERT_TRUE(iter->HasNext());
- RowBlock block(&this->schema_, 100, &this->arena_);
+ RowBlockMemory mem;
+ RowBlock block(&this->schema_, 100, &mem);
// First call to CopyNextRows should fetch the whole memrowset.
ASSERT_OK_FAST(iter->NextBlock(&block));
@@ -579,8 +581,10 @@ TYPED_TEST(TestTablet, TestRowIteratorOrdered) {
// Iterate the tablet collecting rows.
vector<shared_ptr<faststring> > rows;
+ RowBlockMemory mem;
for (int i = 0; i < numBlocks; i++) {
- RowBlock block(&this->schema_, rowsPerBlock, &this->arena_);
+ mem.Reset();
+ RowBlock block(&this->schema_, rowsPerBlock, &mem);
ASSERT_TRUE(iter->HasNext());
ASSERT_OK(iter->NextBlock(&block));
ASSERT_EQ(rowsPerBlock, block.nrows()) << "unexpected number of rows returned";
@@ -683,9 +687,10 @@ TYPED_TEST(TestTablet, TestRowIteratorComplex) {
vector<bool> seen(max_rows, false);
int seen_count = 0;
- RowBlock block(&schema, 100, &this->arena_);
+ RowBlockMemory mem;
+ RowBlock block(&schema, 100, &mem);
while (iter->HasNext()) {
- this->arena_.Reset();
+ mem.Reset();
ASSERT_OK(iter->NextBlock(&block));
LOG(INFO) << "Fetched batch of " << block.nrows();
for (size_t i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tablet/tablet_random_access-test.cc b/src/kudu/tablet/tablet_random_access-test.cc
index 443e103..34ddd39 100644
--- a/src/kudu/tablet/tablet_random_access-test.cc
+++ b/src/kudu/tablet/tablet_random_access-test.cc
@@ -34,6 +34,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.pb.h"
@@ -44,7 +45,6 @@
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/util/countdown_latch.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
@@ -294,10 +294,10 @@ class TestRandomAccess : public KuduTabletTest {
optional<ExpectedKeyValueRow> ret;
int n_results = 0;
- Arena arena(1024);
- RowBlock block(&schema, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&schema, 100, &mem);
while (iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
CHECK_OK(iter->NextBlock(&block));
for (int i = 0; i < block.nrows(); i++) {
if (!block.selection_vector()->IsRowSelected(i)) {
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 2c313ff..70282e3 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -37,6 +37,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partition.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
@@ -81,7 +82,6 @@
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/pb_util.h"
@@ -715,10 +715,11 @@ Status DumpRowSetInternal(const IOContext& ctx,
RETURN_NOT_OK(rs->NewRowIterator(opts, &it));
RETURN_NOT_OK(it->Init(nullptr));
- Arena arena(1024);
- RowBlock block(&key_proj, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&key_proj, 100, &mem);
faststring key;
while (it->HasNext()) {
+ mem.Reset();
RETURN_NOT_OK(it->NextBlock(&block));
for (int i = 0; i < block.nrows(); i++) {
key_proj.EncodeComparableKey(block.row(i), &key);
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 5815f69..9aa19af 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -191,6 +191,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
@@ -219,7 +220,6 @@
#include "kudu/util/flag_validators.h"
#include "kudu/util/int128.h"
#include "kudu/util/logging.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
@@ -884,11 +884,11 @@ Status TabletScan(const RunnerContext& context) {
unique_ptr<RowwiseIterator> iter;
RETURN_NOT_OK(tablet->NewRowIterator(std::move(opts), &iter));
RETURN_NOT_OK(iter->Init(nullptr));
- Arena arena(1024);
- RowBlock block(&projection, 100, &arena);
+ RowBlockMemory mem(1024);
+ RowBlock block(&projection, 100, &mem);
int64_t rows_scanned = 0;
while (iter->HasNext()) {
- arena.Reset();
+ mem.Reset();
RETURN_NOT_OK(iter->NextBlock(&block));
rows_scanned += block.nrows();
KLOG_EVERY_N_SECS(INFO, 10) << "scanned " << rows_scanned << " rows";
diff --git a/src/kudu/transactions/txn_status_tablet.cc b/src/kudu/transactions/txn_status_tablet.cc
index cf3c76e..feb2698 100644
--- a/src/kudu/transactions/txn_status_tablet.cc
+++ b/src/kudu/transactions/txn_status_tablet.cc
@@ -32,6 +32,7 @@
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
@@ -47,7 +48,6 @@
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/faststring.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/once.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
@@ -234,8 +234,8 @@ Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) {
boost::optional<int64_t> prev_txn_id = boost::none;
TxnStatusEntryPB prev_status_entry_pb;
vector<ParticipantIdAndPB> prev_participants;
- Arena arena(32 * 1024);
- RowBlock block(&iter->schema(), 512, &arena);
+ RowBlockMemory mem;
+ RowBlock block(&iter->schema(), 512, &mem);
// Iterate over the transaction and participant entries, notifying the
// visitor once a transaction and all its participants have been found.
while (iter->HasNext()) {
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 9cbbeeb..ac1c779 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -33,6 +33,7 @@
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
@@ -51,7 +52,6 @@
#include "kudu/tserver/tablet_server_test_util.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
@@ -413,11 +413,12 @@ void TabletServerTestBase::VerifyRows(const Schema& schema,
std::min<int>(expected.size() / 10,
4*1024*1024 / schema.byte_size()));
- Arena arena(32*1024);
- RowBlock block(&schema, batch_size, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&schema, batch_size, &mem);
int count = 0;
while (iter->HasNext()) {
+ mem.Reset();
ASSERT_OK_FAST(iter->NextBlock(&block));
RowBlockRow rb_row = block.row(0);
for (int i = 0; i < block.nrows(); i++) {
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 9fe419d..f1a6c74 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -44,6 +44,7 @@
#include "kudu/common/key_range.h"
#include "kudu/common/partition.h"
#include "kudu/common/rowblock.h"
+#include "kudu/common/rowblock_memory.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
@@ -2892,9 +2893,8 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
// TODO(todd): could size the RowBlock based on the user's requested batch size?
// If people had really large indirect objects, we would currently overshoot
// their requested batch size by a lot.
- Arena arena(32 * 1024);
- RowBlock block(&iter->schema(),
- FLAGS_scanner_batch_size_rows, &arena);
+ RowBlockMemory mem(32 * 1024);
+ RowBlock block(&iter->schema(), FLAGS_scanner_batch_size_rows, &mem);
// TODO(todd): in the future, use the client timeout to set a budget. For now,
// just use a half second, which should be plenty to amortize call overhead.