You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:29 UTC
[6/6] parquet-cpp git commit: PARQUET-858: Flatten column directory,
minor code consolidation
PARQUET-858: Flatten column directory, minor code consolidation
This strictly moves code around and makes no functional changes; the purpose is to make the codebase easier to navigate. I also ran clang-format
Author: Wes McKinney <we...@twosigma.com>
Closes #363 from wesm/flatten-column-directory and squashes the following commits:
7bc099f [Wes McKinney] Build fixes, clang-format
b16be99 [Wes McKinney] Initial cut flattening out column directory
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/84db929e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/84db929e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/84db929e
Branch: refs/heads/master
Commit: 84db929ec14175badbbed7083be57eca81a99c09
Parents: 491182c
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Jun 26 09:05:17 2017 +0200
Committer: Uwe L. Korn <uw...@apache.org>
Committed: Mon Jun 26 09:05:17 2017 +0200
----------------------------------------------------------------------
CMakeLists.txt | 11 +-
benchmarks/decode_benchmark.cc | 6 +-
src/parquet/CMakeLists.txt | 13 +
src/parquet/api/reader.h | 4 +-
src/parquet/api/writer.h | 2 +-
.../arrow/arrow-reader-writer-benchmark.cc | 4 +-
src/parquet/arrow/arrow-reader-writer-test.cc | 90 ++-
src/parquet/arrow/reader.cc | 123 ++--
src/parquet/arrow/reader.h | 4 +-
src/parquet/arrow/writer.cc | 3 +-
src/parquet/column-io-benchmark.cc | 187 +++++
src/parquet/column/CMakeLists.txt | 38 -
src/parquet/column/column-io-benchmark.cc | 138 ----
src/parquet/column/column-reader-test.cc | 374 ----------
src/parquet/column/column-writer-test.cc | 516 -------------
src/parquet/column/level-benchmark.cc | 78 --
src/parquet/column/levels-test.cc | 245 -------
src/parquet/column/levels.cc | 144 ----
src/parquet/column/levels.h | 86 ---
src/parquet/column/page.h | 201 -----
src/parquet/column/properties-test.cc | 64 --
src/parquet/column/properties.h | 385 ----------
src/parquet/column/reader.cc | 238 ------
src/parquet/column/reader.h | 453 ------------
src/parquet/column/scan-all.cc | 56 --
src/parquet/column/scan-all.h | 41 --
src/parquet/column/scanner-test.cc | 232 ------
src/parquet/column/scanner.cc | 56 --
src/parquet/column/scanner.h | 232 ------
src/parquet/column/statistics-test.cc | 358 ---------
src/parquet/column/statistics.cc | 244 -------
src/parquet/column/statistics.h | 234 ------
src/parquet/column/test-specialization.h | 172 -----
src/parquet/column/test-util.h | 429 -----------
src/parquet/column/writer.cc | 528 --------------
src/parquet/column/writer.h | 250 -------
src/parquet/column_page.h | 201 +++++
src/parquet/column_reader-test.cc | 371 ++++++++++
src/parquet/column_reader.cc | 289 ++++++++
src/parquet/column_reader.h | 475 ++++++++++++
src/parquet/column_scanner-test.cc | 232 ++++++
src/parquet/column_scanner.cc | 90 +++
src/parquet/column_scanner.h | 246 +++++++
src/parquet/column_writer-test.cc | 729 +++++++++++++++++++
src/parquet/column_writer.cc | 597 +++++++++++++++
src/parquet/column_writer.h | 282 +++++++
src/parquet/encoding-internal.h | 5 +-
src/parquet/file/file-deserialize-test.cc | 2 +-
src/parquet/file/file-metadata-test.cc | 2 +-
src/parquet/file/file-serialize-test.cc | 8 +-
src/parquet/file/metadata.h | 4 +-
src/parquet/file/printer.cc | 2 +-
src/parquet/file/reader-internal.cc | 2 +-
src/parquet/file/reader-internal.h | 4 +-
src/parquet/file/reader.cc | 6 +-
src/parquet/file/reader.h | 6 +-
src/parquet/file/writer-internal.cc | 2 +-
src/parquet/file/writer-internal.h | 2 +-
src/parquet/file/writer.h | 2 +-
src/parquet/properties-test.cc | 64 ++
src/parquet/properties.h | 385 ++++++++++
src/parquet/reader-test.cc | 4 +-
src/parquet/schema-test.cc | 16 +-
src/parquet/statistics-test.cc | 358 +++++++++
src/parquet/statistics.cc | 244 +++++++
src/parquet/statistics.h | 234 ++++++
src/parquet/test-specialization.h | 172 +++++
src/parquet/test-util.h | 430 +++++++++++
src/parquet/util/memory.h | 12 +-
src/parquet/util/schema-util.h | 13 +-
src/parquet/util/visibility.h | 3 +-
71 files changed, 5755 insertions(+), 5978 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index d0c1a53..47984e6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -602,12 +602,9 @@ set(LIBPARQUET_SRCS
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
- src/parquet/column/levels.cc
- src/parquet/column/reader.cc
- src/parquet/column/writer.cc
- src/parquet/column/scanner.cc
- src/parquet/column/scan-all.cc
- src/parquet/column/statistics.cc
+ src/parquet/column_reader.cc
+ src/parquet/column_scanner.cc
+ src/parquet/column_writer.cc
src/parquet/file/metadata.cc
src/parquet/file/printer.cc
@@ -617,6 +614,7 @@ set(LIBPARQUET_SRCS
src/parquet/file/writer-internal.cc
src/parquet/schema.cc
+ src/parquet/statistics.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
@@ -669,7 +667,6 @@ ADD_LIB(parquet
add_subdirectory(src/parquet)
add_subdirectory(src/parquet/api)
add_subdirectory(src/parquet/arrow)
-add_subdirectory(src/parquet/column)
add_subdirectory(src/parquet/file)
add_subdirectory(src/parquet/util)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/benchmarks/decode_benchmark.cc
----------------------------------------------------------------------
diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc
index 65a6afd..44776a8 100644
--- a/benchmarks/decode_benchmark.cc
+++ b/benchmarks/decode_benchmark.cc
@@ -288,8 +288,10 @@ void TestPlainIntCompressed(::arrow::Codec* codec, const std::vector<int64_t>& d
int max_compressed_size = codec->MaxCompressedLen(uncompressed_len, raw_data);
uint8_t* compressed_data = new uint8_t[max_compressed_size];
int64_t compressed_len;
- DCHECK(codec->Compress(uncompressed_len, raw_data, max_compressed_size,
- compressed_data, &compressed_len).ok());
+ DCHECK(codec
+ ->Compress(uncompressed_len, raw_data, max_compressed_size, compressed_data,
+ &compressed_len)
+ .ok());
printf("\n%s:\n Uncompressed len: %d\n Compressed len: %d\n", codec->name(),
uncompressed_len, static_cast<int>(compressed_len));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index f0eedcf..09a689e 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -17,9 +17,15 @@
# Headers: top level
install(FILES
+ column_reader.h
+ column_page.h
+ column_scanner.h
+ column_writer.h
encoding.h
exception.h
+ properties.h
schema.h
+ statistics.h
types.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet")
@@ -40,9 +46,16 @@ install(FILES
"${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+ADD_PARQUET_TEST(column_reader-test)
+ADD_PARQUET_TEST(column_scanner-test)
+ADD_PARQUET_TEST(column_writer-test)
+ADD_PARQUET_TEST(properties-test)
+ADD_PARQUET_TEST(statistics-test)
ADD_PARQUET_TEST(encoding-test)
ADD_PARQUET_TEST(public-api-test)
ADD_PARQUET_TEST(types-test)
ADD_PARQUET_TEST(reader-test)
ADD_PARQUET_TEST(schema-test)
+
+ADD_PARQUET_BENCHMARK(column-io-benchmark)
ADD_PARQUET_BENCHMARK(encoding-benchmark)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/api/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/api/reader.h b/src/parquet/api/reader.h
index f41a429..ba9717a 100644
--- a/src/parquet/api/reader.h
+++ b/src/parquet/api/reader.h
@@ -19,8 +19,8 @@
#define PARQUET_API_READER_H
// Column reader API
-#include "parquet/column/reader.h"
-#include "parquet/column/scan-all.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
#include "parquet/exception.h"
#include "parquet/file/printer.h"
#include "parquet/file/reader.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/api/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/api/writer.h b/src/parquet/api/writer.h
index 9c239b2..cc3ae2a 100644
--- a/src/parquet/api/writer.h
+++ b/src/parquet/api/writer.h
@@ -19,7 +19,7 @@
#define PARQUET_API_WRITER_H
// Column reader API
-#include "parquet/column/writer.h"
+#include "parquet/column_writer.h"
#include "parquet/exception.h"
#include "parquet/file/writer.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index b9aa4a2..677e437 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -19,8 +19,8 @@
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
#include "parquet/file/reader-internal.h"
#include "parquet/file/writer-internal.h"
#include "parquet/util/memory.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 3beca35..f2a9651 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1136,10 +1136,10 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr);
}
- void WriteColumnData(size_t num_rows, int16_t* def_levels,
- int16_t* rep_levels, int32_t* values) {
- auto typed_writer = static_cast<TypedColumnWriter<Int32Type>*>(
- row_group_writer_->NextColumn());
+ void WriteColumnData(
+ size_t num_rows, int16_t* def_levels, int16_t* rep_levels, int32_t* values) {
+ auto typed_writer =
+ static_cast<TypedColumnWriter<Int32Type>*>(row_group_writer_->NextColumn());
typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values);
}
@@ -1149,22 +1149,17 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
// Also independently count the nulls
auto local_null_count = 0;
for (int i = 0; i < array.length(); i++) {
- if (array.IsNull(i)) {
- local_null_count++;
- }
+ if (array.IsNull(i)) { local_null_count++; }
}
ASSERT_EQ(local_null_count, expected_nulls);
}
- void ValidateColumnArray(const ::arrow::Int32Array& array,
- size_t expected_nulls) {
+ void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
ValidateArray(array, expected_nulls);
int j = 0;
for (int i = 0; i < values_array_->length(); i++) {
- if (array.IsNull(i)) {
- continue;
- }
+ if (array.IsNull(i)) { continue; }
ASSERT_EQ(array.Value(i), values_array_->Value(j));
j++;
}
@@ -1196,7 +1191,7 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
parquet_fields.push_back(GroupNode::Make("group1", struct_repetition,
{PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
- PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)}));
+ PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)}));
parquet_fields.push_back(
PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
@@ -1209,7 +1204,7 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
std::vector<int16_t> leaf1_def_levels(NUM_SIMPLE_TEST_ROWS);
std::vector<int16_t> leaf2_def_levels(NUM_SIMPLE_TEST_ROWS);
std::vector<int16_t> leaf3_def_levels(NUM_SIMPLE_TEST_ROWS);
- for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) {
+ for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) {
// leaf1 is required within the optional group1, so it is only null
// when the group is null
leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1;
@@ -1227,18 +1222,18 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
int32_t* values = reinterpret_cast<int32_t*>(values_array_->data()->mutable_data());
// Create the actual parquet file
- InitNewParquetFile(std::static_pointer_cast<GroupNode>(schema_node),
- NUM_SIMPLE_TEST_ROWS);
+ InitNewParquetFile(
+ std::static_pointer_cast<GroupNode>(schema_node), NUM_SIMPLE_TEST_ROWS);
// leaf1 column
- WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(),
- rep_levels.data(), values);
+ WriteColumnData(
+ NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(), rep_levels.data(), values);
// leaf2 column
- WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(),
- rep_levels.data(), values);
+ WriteColumnData(
+ NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(), rep_levels.data(), values);
// leaf3 column
- WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(),
- rep_levels.data(), values);
+ WriteColumnData(
+ NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(), rep_levels.data(), values);
FinalizeParquetFile();
InitReader();
@@ -1250,11 +1245,10 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
for (int i = 0; i < num_children; i++) {
if (depth <= 1) {
- children.push_back(PrimitiveNode::Make("leaf",
- node_repetition, leaf_type));
+ children.push_back(PrimitiveNode::Make("leaf", node_repetition, leaf_type));
} else {
- children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children,
- node_repetition, leaf_type));
+ children.push_back(CreateSingleTypedNestedGroup(
+ i, depth - 1, num_children, node_repetition, leaf_type));
}
}
@@ -1264,13 +1258,13 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
}
// A deeply nested schema
- void CreateMultiLevelNestedParquet(int num_trees, int tree_depth,
- int num_children, int num_rows, Repetition::type node_repetition) {
+ void CreateMultiLevelNestedParquet(int num_trees, int tree_depth, int num_children,
+ int num_rows, Repetition::type node_repetition) {
// Create the schema
std::vector<NodePtr> parquet_fields;
for (int i = 0; i < num_trees; i++) {
- parquet_fields.push_back(CreateSingleTypedNestedGroup(i, tree_depth, num_children,
- node_repetition, ParquetType::INT32));
+ parquet_fields.push_back(CreateSingleTypedNestedGroup(
+ i, tree_depth, num_children, node_repetition, ParquetType::INT32));
}
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
@@ -1280,11 +1274,11 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
std::vector<int16_t> rep_levels(num_rows);
for (int i = 0; i < num_rows; i++) {
if (node_repetition == Repetition::REQUIRED) {
- def_levels[i] = 0; // all is required
+ def_levels[i] = 0; // all is required
} else {
- def_levels[i] = i % tree_depth; // all is optional
+ def_levels[i] = i % tree_depth; // all is optional
}
- rep_levels[i] = 0; // none is repeated
+ rep_levels[i] = 0; // none is repeated
}
// Produce values for the columns
@@ -1303,13 +1297,11 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
class DeepParquetTestVisitor : public ArrayVisitor {
public:
- DeepParquetTestVisitor(Repetition::type node_repetition,
- std::shared_ptr<::arrow::Int32Array> expected) :
- node_repetition_(node_repetition), expected_(expected) {}
+ DeepParquetTestVisitor(
+ Repetition::type node_repetition, std::shared_ptr<::arrow::Int32Array> expected)
+ : node_repetition_(node_repetition), expected_(expected) {}
- Status Validate(std::shared_ptr<Array> tree) {
- return tree->Accept(this);
- }
+ Status Validate(std::shared_ptr<Array> tree) { return tree->Accept(this); }
virtual Status Visit(const ::arrow::Int32Array& array) {
if (node_repetition_ == Repetition::REQUIRED) {
@@ -1367,14 +1359,14 @@ TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2);
ValidateTableArrayTypes(*table);
- auto struct_field_array = std::static_pointer_cast<::arrow::StructArray>(
- table->column(0)->data()->chunk(0));
- auto leaf1_array = std::static_pointer_cast<::arrow::Int32Array>(
- struct_field_array->field(0));
- auto leaf2_array = std::static_pointer_cast<::arrow::Int32Array>(
- struct_field_array->field(1));
- auto leaf3_array = std::static_pointer_cast<::arrow::Int32Array>(
- table->column(1)->data()->chunk(0));
+ auto struct_field_array =
+ std::static_pointer_cast<::arrow::StructArray>(table->column(0)->data()->chunk(0));
+ auto leaf1_array =
+ std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(0));
+ auto leaf2_array =
+ std::static_pointer_cast<::arrow::Int32Array>(struct_field_array->field(1));
+ auto leaf3_array =
+ std::static_pointer_cast<::arrow::Int32Array>(table->column(1)->data()->chunk(0));
// validate struct and leaf arrays
@@ -1383,7 +1375,7 @@ TEST_F(TestNestedSchemaRead, ReadIntoTableFull) {
// validate leaf1
ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3);
// validate leaf2
- ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2/ 3);
+ ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2 / 3);
// validate leaf3
ValidateColumnArray(*leaf3_array, 0);
}
@@ -1452,7 +1444,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
}
INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
- ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
+ ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
// PARQUET-995
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index ef9ac34..a3a26c9 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -184,8 +184,8 @@ class FileReader::Impl {
Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
- Status ReadSchemaField(int i, const std::vector<int>& indices,
- std::shared_ptr<Array>* out);
+ Status ReadSchemaField(
+ int i, const std::vector<int>& indices, std::shared_ptr<Array>* out);
Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
@@ -226,7 +226,7 @@ class ColumnReader::Impl {
};
// Reader implementation for primitive arrays
-class PrimitiveImpl: public ColumnReader::Impl {
+class PrimitiveImpl : public ColumnReader::Impl {
public:
PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
: pool_(pool),
@@ -300,11 +300,13 @@ class PrimitiveImpl: public ColumnReader::Impl {
};
// Reader implementation for struct array
-class StructImpl: public ColumnReader::Impl {
+class StructImpl : public ColumnReader::Impl {
public:
explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
- : children_(children), struct_def_level_(struct_def_level), pool_(pool),
+ : children_(children),
+ struct_def_level_(struct_def_level),
+ pool_(pool),
def_levels_buffer_(pool) {
InitField(node, children);
}
@@ -323,10 +325,9 @@ class StructImpl: public ColumnReader::Impl {
std::shared_ptr<Field> field_;
PoolBuffer def_levels_buffer_;
- Status DefLevelsToNullArray(std::shared_ptr<MutableBuffer>* null_bitmap,
- int64_t* null_count);
- void InitField(const NodePtr& node,
- const std::vector<std::shared_ptr<Impl>>& children);
+ Status DefLevelsToNullArray(
+ std::shared_ptr<MutableBuffer>* null_bitmap, int64_t* null_count);
+ void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
};
FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -337,8 +338,7 @@ FileReader::~FileReader() {}
Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(
- new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
*out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
return Status::OK();
}
@@ -346,7 +346,6 @@ Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
const std::vector<int>& indices, int16_t def_level,
std::unique_ptr<ColumnReader::Impl>* out) {
-
*out = nullptr;
if (IsSimpleStruct(node)) {
@@ -357,16 +356,14 @@ Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
// TODO(itaiin): Remove the -1 index hack when all types of nested reads
// are supported. This currently just signals the lower level reader resolution
// to abort
- RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices,
- def_level + 1, &child_reader));
- if (child_reader != nullptr) {
- children.push_back(std::move(child_reader));
- }
+ RETURN_NOT_OK(GetReaderForNode(
+ index, group->field(i), indices, def_level + 1, &child_reader));
+ if (child_reader != nullptr) { children.push_back(std::move(child_reader)); }
}
if (children.size() > 0) {
*out = std::unique_ptr<ColumnReader::Impl>(
- new StructImpl(children, def_level, pool_, node));
+ new StructImpl(children, def_level, pool_, node));
}
} else {
// This should be a flat field case - translate the field index to
@@ -376,8 +373,7 @@ Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
DCHECK(walker->is_group());
auto group = static_cast<GroupNode*>(walker.get());
if (group->field_count() != 1) {
- return Status::NotImplemented(
- "lists with structs are not supported.");
+ return Status::NotImplemented("lists with structs are not supported.");
}
walker = group->field(0);
}
@@ -405,8 +401,8 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<Array>* out) {
return ReadSchemaField(i, indices, out);
}
-Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
- std::shared_ptr<Array>* out) {
+Status FileReader::Impl::ReadSchemaField(
+ int i, const std::vector<int>& indices, std::shared_ptr<Array>* out) {
auto parquet_schema = reader_->metadata()->schema();
auto node = parquet_schema->group_node()->field(i);
@@ -461,15 +457,14 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
// TODO(wesm): Refactor to share more code with ReadTable
auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
- this](int i) {
+ this](int i) {
int column_index = indices[i];
int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
std::unique_ptr<FileColumnIterator> input(
new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(
- new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
ColumnReader flat_column_reader(std::move(impl));
std::shared_ptr<Array> array;
@@ -498,8 +493,8 @@ Status FileReader::Impl::ReadTable(
// We only need to read schema fields which have columns indicated
// in the indices vector
std::vector<int> field_indices;
- if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(),
- indices, &field_indices)) {
+ if (!ColumnIndicesToFieldIndices(
+ *reader_->metadata()->schema(), indices, &field_indices)) {
return Status::Invalid("Invalid column index");
}
@@ -628,8 +623,7 @@ const ParquetFileReader* FileReader::parquet_reader() const {
}
template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNonNullableBatch(
- TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
int64_t values_to_read, int64_t* levels_read) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;
@@ -709,8 +703,8 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
}
template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType,
- BooleanType>(TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
+Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
+ TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
int64_t* levels_read) {
RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
@@ -727,8 +721,7 @@ Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType,
}
template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNullableBatch(
- TypedColumnReader<ParquetType>* reader,
+Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read,
int64_t* levels_read, int64_t* values_read) {
using ArrowCType = typename ArrowType::c_type;
@@ -998,8 +991,7 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
}
template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::TypedReadBatch(
- int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
using ArrowCType = typename ArrowType::c_type;
int values_to_read = batch_size;
@@ -1127,8 +1119,7 @@ Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
}
template <typename ArrowType>
-Status PrimitiveImpl::ReadByteArrayBatch(
- int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out) {
using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
int total_levels_read = 0;
@@ -1252,8 +1243,7 @@ Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
break;
-Status PrimitiveImpl::NextBatch(
- int batch_size, std::shared_ptr<Array>* out) {
+Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
if (!column_reader_) {
// Exhausted all row groups.
*out = nullptr;
@@ -1265,21 +1255,21 @@ Status PrimitiveImpl::NextBatch(
*out = std::make_shared<::arrow::NullArray>(batch_size);
return Status::OK();
break;
- TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
- TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
- TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
- TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
- TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
- TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
- TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
- TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
- TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
- TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
- TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
- TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
- TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
- TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
- TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+ TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+ TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
+ TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+ TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+ TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
case ::arrow::Type::FIXED_SIZE_BINARY: {
int32_t byte_width =
static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
@@ -1340,15 +1330,13 @@ Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
// StructImpl methods
Status StructImpl::DefLevelsToNullArray(
- std::shared_ptr<MutableBuffer>* null_bitmap_out,
- int64_t* null_count_out) {
+ std::shared_ptr<MutableBuffer>* null_bitmap_out, int64_t* null_count_out) {
std::shared_ptr<MutableBuffer> null_bitmap;
auto null_count = 0;
ValueLevelsPtr def_levels_data;
size_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
- RETURN_NOT_OK(GetEmptyBitmap(pool_,
- def_levels_length, &null_bitmap));
+ RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
for (size_t i = 0; i < def_levels_length; i++) {
if (def_levels_data[i] < struct_def_level_) {
@@ -1397,11 +1385,10 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
for (size_t i = 0; i < child_length; i++) {
// Check that value is either uninitialized, or current
// and previous children def levels agree on the struct level
- DCHECK((result_levels[i] == -1) ||
- ((result_levels[i] >= struct_def_level_) ==
- (child_def_levels[i] >= struct_def_level_)));
- result_levels[i] = std::max(result_levels[i],
- std::min(child_def_levels[i], struct_def_level_));
+ DCHECK((result_levels[i] == -1) || ((result_levels[i] >= struct_def_level_) ==
+ (child_def_levels[i] >= struct_def_level_)));
+ result_levels[i] =
+ std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
}
*data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
@@ -1409,11 +1396,11 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
return Status::OK();
}
-void StructImpl::InitField(const NodePtr& node,
- const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(
+ const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children) {
// Make a shallow node to field conversion from the children fields
std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
- for (size_t i = 0; i < children.size(); i++) {
+ for (size_t i = 0; i < children.size(); i++) {
fields[i] = children[i]->field();
}
auto type = std::make_shared<::arrow::StructType>(fields);
@@ -1440,8 +1427,8 @@ Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
- *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
- null_bitmap, null_count);
+ *out = std::make_shared<StructArray>(
+ field()->type(), batch_size, children_arrays, null_bitmap, null_count);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 06a64f8..8d9aeb5 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -130,8 +130,8 @@ class PARQUET_EXPORT FileReader {
// i=1 indices={3} will read foo2 column
// i=1 indices={2} will result in out=nullptr
// leaf indices which are unrelated to the schema field are ignored
- ::arrow::Status ReadSchemaField(int i, const std::vector<int>& indices,
- std::shared_ptr<::arrow::Array>* out);
+ ::arrow::Status ReadSchemaField(
+ int i, const std::vector<int>& indices, std::shared_ptr<::arrow::Array>* out);
// Read a table of columns into a Table
::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index af4f754..b8cb45c 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -525,8 +525,7 @@ Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
const int16_t* def_levels, const int16_t* rep_levels) {
auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
PARQUET_CATCH_NOT_OK(writer->Close());
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
new file mode 100644
index 0000000..0a60367
--- /dev/null
+++ b/src/parquet/column-io-benchmark.cc
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "benchmark/benchmark.h"
+
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/file/writer-internal.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+
+using schema::PrimitiveNode;
+
+namespace benchmark {
+
+std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
+ ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
+ const WriterProperties* properties) {
+ std::unique_ptr<SerializedPageWriter> pager(
+ new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
+ return std::unique_ptr<Int64Writer>(new Int64Writer(
+ metadata, std::move(pager), output_size, Encoding::PLAIN, properties));
+}
+
+std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
+ auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
+ return std::make_shared<ColumnDescriptor>(
+ node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
+}
+
+void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
+ int64_t bytes_processed = state.iterations() * state.range(0) * sizeof(int64_t);
+ if (repetition != Repetition::REQUIRED) {
+ bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+ }
+ if (repetition == Repetition::REPEATED) {
+ bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
+ }
+ state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
+}
+
+template <Repetition::type repetition>
+static void BM_WriteInt64Column(::benchmark::State& state) {
+ format::ColumnChunk thrift_metadata;
+ std::vector<int64_t> values(state.range(0), 128);
+ std::vector<int16_t> definition_levels(state.range(0), 1);
+ std::vector<int16_t> repetition_levels(state.range(0), 0);
+ std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
+ std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ auto metadata = ColumnChunkMetaDataBuilder::Make(
+ properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
+
+ while (state.KeepRunning()) {
+ InMemoryOutputStream stream;
+ std::unique_ptr<Int64Writer> writer = BuildWriter(
+ state.range(0), &stream, metadata.get(), schema.get(), properties.get());
+ writer->WriteBatch(
+ values.size(), definition_levels.data(), repetition_levels.data(), values.data());
+ writer->Close();
+ }
+ SetBytesProcessed(state, repetition);
+}
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024, 65536);
+
+BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
+
+std::unique_ptr<Int64Reader> BuildReader(
+ std::shared_ptr<Buffer>& buffer, int64_t num_values, ColumnDescriptor* schema) {
+ std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
+ std::unique_ptr<SerializedPageReader> page_reader(
+ new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
+ return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
+}
+
+template <Repetition::type repetition>
+static void BM_ReadInt64Column(::benchmark::State& state) {
+ format::ColumnChunk thrift_metadata;
+ std::vector<int64_t> values(state.range(0), 128);
+ std::vector<int16_t> definition_levels(state.range(0), 1);
+ std::vector<int16_t> repetition_levels(state.range(0), 0);
+ std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
+ std::shared_ptr<WriterProperties> properties = default_writer_properties();
+ auto metadata = ColumnChunkMetaDataBuilder::Make(
+ properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
+
+ InMemoryOutputStream stream;
+ std::unique_ptr<Int64Writer> writer = BuildWriter(
+ state.range(0), &stream, metadata.get(), schema.get(), properties.get());
+ writer->WriteBatch(
+ values.size(), definition_levels.data(), repetition_levels.data(), values.data());
+ writer->Close();
+
+ std::shared_ptr<Buffer> src = stream.GetBuffer();
+ std::vector<int64_t> values_out(state.range(1));
+ std::vector<int16_t> definition_levels_out(state.range(1));
+ std::vector<int16_t> repetition_levels_out(state.range(1));
+ while (state.KeepRunning()) {
+ std::unique_ptr<Int64Reader> reader = BuildReader(src, state.range(1), schema.get());
+ int64_t values_read = 0;
+ for (size_t i = 0; i < values.size(); i += values_read) {
+ reader->ReadBatch(values_out.size(), definition_levels_out.data(),
+ repetition_levels_out.data(), values_out.data(), &values_read);
+ }
+ }
+ SetBytesProcessed(state, repetition);
+}
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED)
+ ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)
+ ->RangePair(1024, 65536, 1, 1024);
+
+BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
+ ->RangePair(1024, 65536, 1, 1024);
+
+static void BM_RleEncoding(::benchmark::State& state) {
+ std::vector<int16_t> levels(state.range(0), 0);
+ int64_t n = 0;
+ std::generate(
+ levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
+ int16_t max_level = 1;
+ int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
+ auto buffer_rle = std::make_shared<PoolBuffer>();
+ PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
+
+ while (state.KeepRunning()) {
+ LevelEncoder level_encoder;
+ level_encoder.Init(Encoding::RLE, max_level, levels.size(),
+ buffer_rle->mutable_data(), buffer_rle->size());
+ level_encoder.Encode(levels.size(), levels.data());
+ }
+ state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
+ state.SetItemsProcessed(state.iterations() * state.range(0));
+}
+
+BENCHMARK(BM_RleEncoding)->RangePair(1024, 65536, 1, 16);
+
+static void BM_RleDecoding(::benchmark::State& state) {
+ LevelEncoder level_encoder;
+ std::vector<int16_t> levels(state.range(0), 0);
+ int64_t n = 0;
+ std::generate(
+ levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
+ int16_t max_level = 1;
+ int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
+ auto buffer_rle = std::make_shared<PoolBuffer>();
+ PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
+ level_encoder.Init(Encoding::RLE, max_level, levels.size(),
+ buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
+ level_encoder.Encode(levels.size(), levels.data());
+ reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
+
+ while (state.KeepRunning()) {
+ LevelDecoder level_decoder;
+ level_decoder.SetData(Encoding::RLE, max_level, levels.size(), buffer_rle->data());
+ level_decoder.Decode(state.range(0), levels.data());
+ }
+
+ state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
+ state.SetItemsProcessed(state.iterations() * state.range(0));
+}
+
+BENCHMARK(BM_RleDecoding)->RangePair(1024, 65536, 1, 16);
+
+} // namespace benchmark
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
deleted file mode 100644
index 789248d..0000000
--- a/src/parquet/column/CMakeLists.txt
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# Headers: top level
-install(FILES
- levels.h
- page.h
- properties.h
- reader.h
- scan-all.h
- scanner.h
- writer.h
- statistics.h
- DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/column")
-
-ADD_PARQUET_TEST(column-reader-test)
-ADD_PARQUET_TEST(column-writer-test)
-ADD_PARQUET_TEST(levels-test)
-ADD_PARQUET_TEST(properties-test)
-ADD_PARQUET_TEST(scanner-test)
-ADD_PARQUET_TEST(statistics-test)
-
-ADD_PARQUET_BENCHMARK(column-io-benchmark)
-ADD_PARQUET_BENCHMARK(level-benchmark)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
deleted file mode 100644
index 24afab2..0000000
--- a/src/parquet/column/column-io-benchmark.cc
+++ /dev/null
@@ -1,138 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "benchmark/benchmark.h"
-
-#include "parquet/column/reader.h"
-#include "parquet/column/writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-using schema::PrimitiveNode;
-
-namespace benchmark {
-
-std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
- ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema,
- const WriterProperties* properties) {
- std::unique_ptr<SerializedPageWriter> pager(
- new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
- return std::unique_ptr<Int64Writer>(new Int64Writer(
- metadata, std::move(pager), output_size, Encoding::PLAIN, properties));
-}
-
-std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
- auto node = PrimitiveNode::Make("int64", repetition, Type::INT64);
- return std::make_shared<ColumnDescriptor>(
- node, repetition != Repetition::REQUIRED, repetition == Repetition::REPEATED);
-}
-
-void SetBytesProcessed(::benchmark::State& state, Repetition::type repetition) {
- int64_t bytes_processed = state.iterations() * state.range(0) * sizeof(int64_t);
- if (repetition != Repetition::REQUIRED) {
- bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
- }
- if (repetition == Repetition::REPEATED) {
- bytes_processed += state.iterations() * state.range(0) * sizeof(int16_t);
- }
- state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
-}
-
-template <Repetition::type repetition>
-static void BM_WriteInt64Column(::benchmark::State& state) {
- format::ColumnChunk thrift_metadata;
- std::vector<int64_t> values(state.range(0), 128);
- std::vector<int16_t> definition_levels(state.range(0), 1);
- std::vector<int16_t> repetition_levels(state.range(0), 0);
- std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- std::shared_ptr<WriterProperties> properties = default_writer_properties();
- auto metadata = ColumnChunkMetaDataBuilder::Make(
- properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
-
- while (state.KeepRunning()) {
- InMemoryOutputStream stream;
- std::unique_ptr<Int64Writer> writer = BuildWriter(
- state.range(0), &stream, metadata.get(), schema.get(), properties.get());
- writer->WriteBatch(
- values.size(), definition_levels.data(), repetition_levels.data(), values.data());
- writer->Close();
- }
- SetBytesProcessed(state, repetition);
-}
-
-BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REQUIRED)->Range(1024, 65536);
-
-BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::OPTIONAL)->Range(1024, 65536);
-
-BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED)->Range(1024, 65536);
-
-std::unique_ptr<Int64Reader> BuildReader(
- std::shared_ptr<Buffer>& buffer, int64_t num_values, ColumnDescriptor* schema) {
- std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
- std::unique_ptr<SerializedPageReader> page_reader(
- new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
- return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
-}
-
-template <Repetition::type repetition>
-static void BM_ReadInt64Column(::benchmark::State& state) {
- format::ColumnChunk thrift_metadata;
- std::vector<int64_t> values(state.range(0), 128);
- std::vector<int16_t> definition_levels(state.range(0), 1);
- std::vector<int16_t> repetition_levels(state.range(0), 0);
- std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
- std::shared_ptr<WriterProperties> properties = default_writer_properties();
- auto metadata = ColumnChunkMetaDataBuilder::Make(
- properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
-
- InMemoryOutputStream stream;
- std::unique_ptr<Int64Writer> writer = BuildWriter(
- state.range(0), &stream, metadata.get(), schema.get(), properties.get());
- writer->WriteBatch(
- values.size(), definition_levels.data(), repetition_levels.data(), values.data());
- writer->Close();
-
- std::shared_ptr<Buffer> src = stream.GetBuffer();
- std::vector<int64_t> values_out(state.range(1));
- std::vector<int16_t> definition_levels_out(state.range(1));
- std::vector<int16_t> repetition_levels_out(state.range(1));
- while (state.KeepRunning()) {
- std::unique_ptr<Int64Reader> reader = BuildReader(src, state.range(1), schema.get());
- int64_t values_read = 0;
- for (size_t i = 0; i < values.size(); i += values_read) {
- reader->ReadBatch(values_out.size(), definition_levels_out.data(),
- repetition_levels_out.data(), values_out.data(), &values_read);
- }
- }
- SetBytesProcessed(state, repetition);
-}
-
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REQUIRED)
- ->RangePair(1024, 65536, 1, 1024);
-
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::OPTIONAL)
- ->RangePair(1024, 65536, 1, 1024);
-
-BENCHMARK_TEMPLATE(BM_ReadInt64Column, Repetition::REPEATED)
- ->RangePair(1024, 65536, 1, 1024);
-
-} // namespace benchmark
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
deleted file mode 100644
index e34ac4c..0000000
--- a/src/parquet/column/column-reader-test.cc
+++ /dev/null
@@ -1,374 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <algorithm>
-#include <cstdint>
-#include <cstdlib>
-#include <limits>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/column/page.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/test-util.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/test-common.h"
-
-using std::string;
-using std::vector;
-using std::shared_ptr;
-
-namespace parquet {
-
-using schema::NodePtr;
-
-namespace test {
-
-template <typename T>
-static inline bool vector_equal_with_def_levels(const vector<T>& left,
- const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
- const vector<T>& right) {
- size_t i_left = 0;
- size_t i_right = 0;
- for (size_t i = 0; i < def_levels.size(); i++) {
- if (def_levels[i] == max_def_levels) {
- // Compare
- if (left[i_left] != right[i_right]) {
- std::cerr << "index " << i << " left was " << left[i_left] << " right was "
- << right[i] << std::endl;
- return false;
- }
- i_left++;
- i_right++;
- } else if (def_levels[i] == (max_def_levels -1)) {
- // Null entry on the lowest nested level
- i_right++;
- } else if (def_levels[i] < (max_def_levels - 1)) {
- // Null entry on a higher nesting level, only supported for non-repeating data
- if (max_rep_levels == 0) {
- i_right++;
- }
- }
- }
-
- return true;
-}
-
-class TestPrimitiveReader : public ::testing::Test {
- public:
- void InitReader(const ColumnDescriptor* d) {
- std::unique_ptr<PageReader> pager_;
- pager_.reset(new test::MockPageReader(pages_));
- reader_ = ColumnReader::Make(d, std::move(pager_));
- }
-
- void CheckResults() {
- vector<int32_t> vresult(num_values_, -1);
- vector<int16_t> dresult(num_levels_, -1);
- vector<int16_t> rresult(num_levels_, -1);
- int64_t values_read = 0;
- int total_values_read = 0;
- int batch_actual = 0;
-
- Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
- int32_t batch_size = 8;
- int batch = 0;
- // This will cover both the cases
- // 1) batch_size < page_size (multiple ReadBatch from a single page)
- // 2) batch_size > page_size (BatchRead limits to a single page)
- do {
- batch = static_cast<int>(reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
- &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read));
- total_values_read += static_cast<int>(values_read);
- batch_actual += batch;
- batch_size = std::max(batch_size * 2, 4096);
- } while (batch > 0);
-
- ASSERT_EQ(num_levels_, batch_actual);
- ASSERT_EQ(num_values_, total_values_read);
- ASSERT_TRUE(vector_equal(values_, vresult));
- if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); }
- if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
- // catch improper writes at EOS
- batch_actual =
- static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
- ASSERT_EQ(0, batch_actual);
- ASSERT_EQ(0, values_read);
- }
-
- void CheckResultsSpaced() {
- vector<int32_t> vresult(num_levels_, -1);
- vector<int16_t> dresult(num_levels_, -1);
- vector<int16_t> rresult(num_levels_, -1);
- vector<uint8_t> valid_bits(num_levels_, 255);
- int total_values_read = 0;
- int batch_actual = 0;
- int levels_actual = 0;
- int64_t null_count = -1;
- int64_t levels_read = 0;
- int64_t values_read;
-
- Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
- int32_t batch_size = 8;
- int batch = 0;
- // This will cover both the cases
- // 1) batch_size < page_size (multiple ReadBatch from a single page)
- // 2) batch_size > page_size (BatchRead limits to a single page)
- do {
- batch = static_cast<int>(reader->ReadBatchSpaced(batch_size,
- dresult.data() + levels_actual, rresult.data() + levels_actual,
- vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
- &levels_read, &values_read, &null_count));
- total_values_read += batch - static_cast<int>(null_count);
- batch_actual += batch;
- levels_actual += static_cast<int>(levels_read);
- batch_size = std::max(batch_size * 2, 4096);
- } while ((batch > 0) || (levels_read > 0));
-
- ASSERT_EQ(num_levels_, levels_actual);
- ASSERT_EQ(num_values_, total_values_read);
- if (max_def_level_ > 0) {
- ASSERT_TRUE(vector_equal(def_levels_, dresult));
- ASSERT_TRUE(
- vector_equal_with_def_levels(values_, dresult, max_def_level_,
- max_rep_level_, vresult));
- } else {
- ASSERT_TRUE(vector_equal(values_, vresult));
- }
- if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
- // catch improper writes at EOS
- batch_actual = static_cast<int>(reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr,
- valid_bits.data(), 0, &levels_read, &values_read, &null_count));
- ASSERT_EQ(0, batch_actual);
- ASSERT_EQ(0, null_count);
- }
-
- void Clear() {
- values_.clear();
- def_levels_.clear();
- rep_levels_.clear();
- pages_.clear();
- reader_.reset();
- }
-
- void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
- num_levels_ = num_pages * levels_per_page;
- InitReader(d);
- CheckResults();
- Clear();
-
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
- num_levels_ = num_pages * levels_per_page;
- InitReader(d);
- CheckResultsSpaced();
- Clear();
- }
-
- void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
- num_levels_ = num_pages * levels_per_page;
- InitReader(d);
- CheckResults();
- Clear();
-
- num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
- rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
- num_levels_ = num_pages * levels_per_page;
- InitReader(d);
- CheckResultsSpaced();
- Clear();
- }
-
- protected:
- int num_levels_;
- int num_values_;
- int16_t max_def_level_;
- int16_t max_rep_level_;
- vector<shared_ptr<Page>> pages_;
- std::shared_ptr<ColumnReader> reader_;
- vector<int32_t> values_;
- vector<int16_t> def_levels_;
- vector<int16_t> rep_levels_;
- vector<uint8_t> data_buffer_; // For BA and FLBA
-};
-
-TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
- int levels_per_page = 100;
- int num_pages = 50;
- max_def_level_ = 0;
- max_rep_level_ = 0;
- NodePtr type = schema::Int32("a", Repetition::REQUIRED);
- const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- ExecutePlain(num_pages, levels_per_page, &descr);
- ExecuteDict(num_pages, levels_per_page, &descr);
-}
-
-TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
- int levels_per_page = 100;
- int num_pages = 50;
- max_def_level_ = 4;
- max_rep_level_ = 0;
- NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
- const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- ExecutePlain(num_pages, levels_per_page, &descr);
- ExecuteDict(num_pages, levels_per_page, &descr);
-}
-
-TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
- int levels_per_page = 100;
- int num_pages = 50;
- max_def_level_ = 4;
- max_rep_level_ = 2;
- NodePtr type = schema::Int32("c", Repetition::REPEATED);
- const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- ExecutePlain(num_pages, levels_per_page, &descr);
- ExecuteDict(num_pages, levels_per_page, &descr);
-}
-
-TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
- int levels_per_page = 100;
- int num_pages = 5;
- max_def_level_ = 0;
- max_rep_level_ = 0;
- NodePtr type = schema::Int32("b", Repetition::REQUIRED);
- const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
- values_, data_buffer_, pages_, Encoding::PLAIN);
- InitReader(&descr);
- vector<int32_t> vresult(levels_per_page / 2, -1);
- vector<int16_t> dresult(levels_per_page / 2, -1);
- vector<int16_t> rresult(levels_per_page / 2, -1);
-
- Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
- int64_t values_read = 0;
-
- // 1) skip_size > page_size (multiple pages skipped)
- // Skip first 2 pages
- int64_t levels_skipped = reader->Skip(2 * levels_per_page);
- ASSERT_EQ(2 * levels_per_page, levels_skipped);
- // Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
- vector<int32_t> sub_values(values_.begin() + 2 * levels_per_page,
- values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page)));
- ASSERT_TRUE(vector_equal(sub_values, vresult));
-
- // 2) skip_size == page_size (skip across two pages)
- levels_skipped = reader->Skip(levels_per_page);
- ASSERT_EQ(levels_per_page, levels_skipped);
- // Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
- sub_values.clear();
- sub_values.insert(sub_values.end(),
- values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)),
- values_.begin() + 4 * levels_per_page);
- ASSERT_TRUE(vector_equal(sub_values, vresult));
-
- // 3) skip_size < page_size (skip limited to a single page)
- // Skip half a page
- levels_skipped = reader->Skip(levels_per_page / 2);
- ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
- // Read half a page
- reader->ReadBatch(
- levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
- sub_values.clear();
- sub_values.insert(sub_values.end(),
- values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)),
- values_.end());
- ASSERT_TRUE(vector_equal(sub_values, vresult));
-
- values_.clear();
- def_levels_.clear();
- rep_levels_.clear();
- pages_.clear();
- reader_.reset();
-}
-
-TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
- max_def_level_ = 0;
- max_rep_level_ = 0;
- NodePtr type = schema::Int32("a", Repetition::REQUIRED);
- const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
- shared_ptr<PoolBuffer> dummy = std::make_shared<PoolBuffer>();
-
- shared_ptr<DictionaryPage> dict_page =
- std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
- shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
- pages_.push_back(dict_page);
- pages_.push_back(data_page);
- InitReader(&descr);
- // Tests Dict : PLAIN, Data : RLE_DICTIONARY
- ASSERT_NO_THROW(reader_->HasNext());
- pages_.clear();
-
- dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0);
- pages_.push_back(dict_page);
- pages_.push_back(data_page);
- InitReader(&descr);
- // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
- ASSERT_NO_THROW(reader_->HasNext());
- pages_.clear();
-
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
- pages_.push_back(data_page);
- InitReader(&descr);
- // Tests dictionary page must occur before data page
- ASSERT_THROW(reader_->HasNext(), ParquetException);
- pages_.clear();
-
- dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY);
- pages_.push_back(dict_page);
- InitReader(&descr);
- // Tests only RLE_DICTIONARY is supported
- ASSERT_THROW(reader_->HasNext(), ParquetException);
- pages_.clear();
-
- shared_ptr<DictionaryPage> dict_page1 =
- std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
- shared_ptr<DictionaryPage> dict_page2 =
- std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
- pages_.push_back(dict_page1);
- pages_.push_back(dict_page2);
- InitReader(&descr);
- // Column cannot have more than one dictionary
- ASSERT_THROW(reader_->HasNext(), ParquetException);
- pages_.clear();
-
- data_page = MakeDataPage<Int32Type>(
- &descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0);
- pages_.push_back(data_page);
- InitReader(&descr);
- // unsupported encoding
- ASSERT_THROW(reader_->HasNext(), ParquetException);
- pages_.clear();
-}
-
-} // namespace test
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
deleted file mode 100644
index 1e1da4a..0000000
--- a/src/parquet/column/column-writer-test.cc
+++ /dev/null
@@ -1,516 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-
-#include "parquet/column/reader.h"
-#include "parquet/column/writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
-#include "parquet/types.h"
-#include "parquet/util/comparison.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-using schema::NodePtr;
-using schema::PrimitiveNode;
-
-namespace test {
-
-// The default size used in most tests.
-const int SMALL_SIZE = 100;
-// Larger size to test some corner cases, only used in some specific cases.
-const int LARGE_SIZE = 100000;
-// Very large size to test dictionary fallback.
-const int VERY_LARGE_SIZE = 400000;
-
-template <typename TestType>
-class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
- public:
- typedef typename TestType::c_type T;
-
- void SetUp() {
- this->SetupValuesOut(SMALL_SIZE);
- writer_properties_ = default_writer_properties();
- definition_levels_out_.resize(SMALL_SIZE);
- repetition_levels_out_.resize(SMALL_SIZE);
-
- this->SetUpSchema(Repetition::REQUIRED);
-
- descr_ = this->schema_.Column(0);
- }
-
- Type::type type_num() { return TestType::type_num; }
-
- void BuildReader(
- int64_t num_rows, Compression::type compression = Compression::UNCOMPRESSED) {
- auto buffer = sink_->GetBuffer();
- std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
- std::unique_ptr<SerializedPageReader> page_reader(
- new SerializedPageReader(std::move(source), num_rows, compression));
- reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
- }
-
- std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
- int64_t output_size = SMALL_SIZE,
- const ColumnProperties& column_properties = ColumnProperties()) {
- sink_.reset(new InMemoryOutputStream());
- metadata_ = ColumnChunkMetaDataBuilder::Make(
- writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
- std::unique_ptr<SerializedPageWriter> pager(
- new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
- WriterProperties::Builder wp_builder;
- if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
- column_properties.encoding == Encoding::RLE_DICTIONARY) {
- wp_builder.enable_dictionary();
- } else {
- wp_builder.disable_dictionary();
- wp_builder.encoding(column_properties.encoding);
- }
- writer_properties_ = wp_builder.build();
- std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
- metadata_.get(), std::move(pager), output_size, writer_properties_.get());
- return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
- }
-
- void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
- BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
- reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
- definition_levels_out_.data(), repetition_levels_out_.data(),
- this->values_out_ptr_, &values_read_);
- this->SyncValuesOut();
- }
-
- void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
-
- void TestRequiredWithEncoding(Encoding::type encoding) {
- return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
- }
-
- void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
- bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
- this->GenerateData(num_rows);
-
- this->WriteRequiredWithSettings(
- encoding, compression, enable_dictionary, enable_statistics, num_rows);
- this->ReadAndCompare(compression, num_rows);
-
- this->WriteRequiredWithSettingsSpaced(
- encoding, compression, enable_dictionary, enable_statistics, num_rows);
- this->ReadAndCompare(compression, num_rows);
- }
-
- void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
- bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
- ColumnProperties column_properties(
- encoding, compression, enable_dictionary, enable_statistics);
- std::shared_ptr<TypedColumnWriter<TestType>> writer =
- this->BuildWriter(num_rows, column_properties);
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- // The behaviour should be independent from the number of Close() calls
- writer->Close();
- writer->Close();
- }
-
- void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
- Compression::type compression, bool enable_dictionary, bool enable_statistics,
- int64_t num_rows) {
- std::vector<uint8_t> valid_bits(
- BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
- ColumnProperties column_properties(
- encoding, compression, enable_dictionary, enable_statistics);
- std::shared_ptr<TypedColumnWriter<TestType>> writer =
- this->BuildWriter(num_rows, column_properties);
- writer->WriteBatchSpaced(
- this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
- // The behaviour should be independent from the number of Close() calls
- writer->Close();
- writer->Close();
- }
-
- void ReadAndCompare(Compression::type compression, int64_t num_rows) {
- this->SetupValuesOut(num_rows);
- this->ReadColumnFully(compression);
- Compare<T> compare(this->descr_);
- for (size_t i = 0; i < this->values_.size(); i++) {
- if (compare(this->values_[i], this->values_out_[i]) ||
- compare(this->values_out_[i], this->values_[i])) {
- std::cout << "Failed at " << i << std::endl;
- }
- ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
- ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
- }
- ASSERT_EQ(this->values_, this->values_out_);
- }
-
- int64_t metadata_num_values() {
- // Metadata accessor must be created lazily.
- // This is because the ColumnChunkMetaData semantics dictate the metadata object is
- // complete (no changes to the metadata buffer can be made after instantiation)
- auto metadata_accessor = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
- return metadata_accessor->num_values();
- }
-
- std::vector<Encoding::type> metadata_encodings() {
- // Metadata accessor must be created lazily.
- // This is because the ColumnChunkMetaData semantics dictate the metadata object is
- // complete (no changes to the metadata buffer can be made after instantiation)
- auto metadata_accessor = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
- return metadata_accessor->encodings();
- }
-
- protected:
- int64_t values_read_;
- // Keep the reader alive as for ByteArray the lifetime of the ByteArray
- // content is bound to the reader.
- std::unique_ptr<TypedColumnReader<TestType>> reader_;
-
- std::vector<int16_t> definition_levels_out_;
- std::vector<int16_t> repetition_levels_out_;
-
- const ColumnDescriptor* descr_;
-
- private:
- format::ColumnChunk thrift_metadata_;
- std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
- std::unique_ptr<InMemoryOutputStream> sink_;
- std::shared_ptr<WriterProperties> writer_properties_;
- std::vector<std::vector<uint8_t>> data_buffer_;
-};
-
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
- int64_t total_values = static_cast<int64_t>(this->values_out_.size());
- BuildReader(total_values, compression);
- values_read_ = 0;
- while (values_read_ < total_values) {
- int64_t values_read_recently = 0;
- reader_->ReadBatch(
- static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
- definition_levels_out_.data() + values_read_,
- repetition_levels_out_.data() + values_read_,
- this->values_out_ptr_ + values_read_, &values_read_recently);
- values_read_ += values_read_recently;
- }
- this->SyncValuesOut();
-}
-
-template <>
-void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
- int64_t total_values = static_cast<int64_t>(this->values_out_.size());
- BuildReader(total_values, compression);
- this->data_buffer_.clear();
-
- values_read_ = 0;
- while (values_read_ < total_values) {
- int64_t values_read_recently = 0;
- reader_->ReadBatch(
- static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
- definition_levels_out_.data() + values_read_,
- repetition_levels_out_.data() + values_read_,
- this->values_out_ptr_ + values_read_, &values_read_recently);
-
- // Copy contents of the pointers
- std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
- uint8_t* data_ptr = data.data();
- for (int64_t i = 0; i < values_read_recently; i++) {
- memcpy(data_ptr + this->descr_->type_length() * i,
- this->values_out_[i + values_read_].ptr, this->descr_->type_length());
- this->values_out_[i + values_read_].ptr =
- data_ptr + this->descr_->type_length() * i;
- }
- data_buffer_.emplace_back(std::move(data));
-
- values_read_ += values_read_recently;
- }
- this->SyncValuesOut();
-}
-
-typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
- BooleanType, ByteArrayType, FLBAType>
- TestTypes;
-
-TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
-
-using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
- this->TestRequiredWithEncoding(Encoding::PLAIN);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
- this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
-}
-
-/*
-TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
- this->TestRequiredWithEncoding(Encoding::RLE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
- this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
- this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
- this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
- this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
- this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
-}
-*/
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::BROTLI, false, false, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::BROTLI, false, true, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
- this->TestRequiredWithSettings(
- Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
-}
-
-TYPED_TEST(TestPrimitiveWriter, Optional) {
- // Optional and non-repeated, with definition levels
- // but no repetition levels
- this->SetUpSchema(Repetition::OPTIONAL);
-
- this->GenerateData(SMALL_SIZE);
- std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
- definition_levels[1] = 0;
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(
- this->values_.size(), definition_levels.data(), nullptr, this->values_ptr_);
- writer->Close();
-
- // PARQUET-703
- ASSERT_EQ(100, this->metadata_num_values());
-
- this->ReadColumn();
- ASSERT_EQ(99, this->values_read_);
- this->values_out_.resize(99);
- this->values_.resize(99);
- ASSERT_EQ(this->values_, this->values_out_);
-}
-
-TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
- // Optional and non-repeated, with definition levels
- // but no repetition levels
- this->SetUpSchema(Repetition::OPTIONAL);
-
- this->GenerateData(SMALL_SIZE);
- std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
- std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
-
- definition_levels[SMALL_SIZE - 1] = 0;
- ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
- definition_levels[1] = 0;
- ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
-
- auto writer = this->BuildWriter();
- writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
- valid_bits.data(), 0, this->values_ptr_);
- writer->Close();
-
- // PARQUET-703
- ASSERT_EQ(100, this->metadata_num_values());
-
- this->ReadColumn();
- ASSERT_EQ(98, this->values_read_);
- this->values_out_.resize(98);
- this->values_.resize(99);
- this->values_.erase(this->values_.begin() + 1);
- ASSERT_EQ(this->values_, this->values_out_);
-}
-
-TYPED_TEST(TestPrimitiveWriter, Repeated) {
- // Optional and repeated, so definition and repetition levels
- this->SetUpSchema(Repetition::REPEATED);
-
- this->GenerateData(SMALL_SIZE);
- std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
- definition_levels[1] = 0;
- std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), this->values_ptr_);
- writer->Close();
-
- this->ReadColumn();
- ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
- this->values_out_.resize(SMALL_SIZE - 1);
- this->values_.resize(SMALL_SIZE - 1);
- ASSERT_EQ(this->values_, this->values_out_);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
- this->GenerateData(SMALL_SIZE - 1);
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- ASSERT_THROW(writer->Close(), ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
- this->GenerateData(2 * SMALL_SIZE);
-
- auto writer = this->BuildWriter();
- ASSERT_THROW(
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
- ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
- // Optional and repeated, so definition and repetition levels
- this->SetUpSchema(Repetition::REPEATED);
-
- this->GenerateData(SMALL_SIZE);
- std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
- definition_levels[1] = 0;
- std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
- repetition_levels[3] = 1;
-
- auto writer = this->BuildWriter();
- writer->WriteBatch(this->values_.size(), definition_levels.data(),
- repetition_levels.data(), this->values_ptr_);
- ASSERT_THROW(writer->Close(), ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
- this->GenerateData(LARGE_SIZE);
-
- // Test case 1: required and non-repeated, so no definition or repetition levels
- auto writer = this->BuildWriter(LARGE_SIZE);
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- writer->Close();
-
- // Just read the first SMALL_SIZE rows to ensure we could read it back in
- this->ReadColumn();
- ASSERT_EQ(SMALL_SIZE, this->values_read_);
- this->values_.resize(SMALL_SIZE);
- ASSERT_EQ(this->values_, this->values_out_);
-}
-
-// Test case for dictionary fallback encoding
-TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
- this->GenerateData(VERY_LARGE_SIZE);
-
- auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
- writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
- writer->Close();
-
- // Read all rows so we are sure that also the non-dictionary pages are read correctly
- this->SetupValuesOut(VERY_LARGE_SIZE);
- this->ReadColumnFully();
- ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
- this->values_.resize(VERY_LARGE_SIZE);
- ASSERT_EQ(this->values_, this->values_out_);
- std::vector<Encoding::type> encodings = this->metadata_encodings();
- // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
- // Dictionary encoding is not allowed for boolean type
- // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
- if (this->type_num() != Type::BOOLEAN) {
- ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
- ASSERT_EQ(Encoding::PLAIN, encodings[1]);
- ASSERT_EQ(Encoding::RLE, encodings[2]);
- } else {
- ASSERT_EQ(Encoding::PLAIN, encodings[0]);
- ASSERT_EQ(Encoding::RLE, encodings[1]);
- }
-}
-
-// PARQUET-719
-// Test case for NULL values
-TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
- this->SetUpSchema(Repetition::OPTIONAL);
-
- this->GenerateData(LARGE_SIZE);
-
- std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
- std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
-
- auto writer = this->BuildWriter(LARGE_SIZE);
- // All values being written are NULL
- writer->WriteBatch(
- this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
- writer->Close();
-
- // Just read the first SMALL_SIZE rows to ensure we could read it back in
- this->ReadColumn();
- ASSERT_EQ(0, this->values_read_);
-}
-
-// PARQUET-764
-// Correct bitpacking for boolean write at non-byte boundaries
-using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
-TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
- this->SetUpSchema(Repetition::REQUIRED);
- auto writer = this->BuildWriter();
- for (int i = 0; i < SMALL_SIZE; i++) {
- bool value = (i % 2 == 0) ? true : false;
- writer->WriteBatch(1, nullptr, nullptr, &value);
- }
- writer->Close();
- this->ReadColumn();
- for (int i = 0; i < SMALL_SIZE; i++) {
- ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
- }
-}
-
-} // namespace test
-} // namespace parquet