You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:27 UTC
[4/6] parquet-cpp git commit: PARQUET-858: Flatten column directory,
minor code consolidation
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
deleted file mode 100644
index a9b83c1..0000000
--- a/src/parquet/column/scanner.h
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_SCANNER_H
-#define PARQUET_COLUMN_SCANNER_H
-
-#include <cstdint>
-#include <memory>
-#include <ostream>
-#include <stdio.h>
-#include <string>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
-
-class PARQUET_EXPORT Scanner {
- public:
- explicit Scanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : batch_size_(batch_size),
- level_offset_(0),
- levels_buffered_(0),
- value_buffer_(std::make_shared<PoolBuffer>(pool)),
- value_offset_(0),
- values_buffered_(0),
- reader_(reader) {
- def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
- rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
- }
-
- virtual ~Scanner() {}
-
- static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- virtual void PrintNext(std::ostream& out, int width) = 0;
-
- bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
-
- const ColumnDescriptor* descr() const { return reader_->descr(); }
-
- int64_t batch_size() const { return batch_size_; }
-
- void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
-
- protected:
- int64_t batch_size_;
-
- std::vector<int16_t> def_levels_;
- std::vector<int16_t> rep_levels_;
- int level_offset_;
- int levels_buffered_;
-
- std::shared_ptr<PoolBuffer> value_buffer_;
- int value_offset_;
- int64_t values_buffered_;
-
- private:
- std::shared_ptr<ColumnReader> reader_;
-};
-
-template <typename DType>
-class PARQUET_EXPORT TypedScanner : public Scanner {
- public:
- typedef typename DType::c_type T;
-
- explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : Scanner(reader, batch_size, pool) {
- typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
- int value_byte_size = type_traits<DType::type_num>::value_byte_size;
- PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
- values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
- }
-
- virtual ~TypedScanner() {}
-
- bool NextLevels(int16_t* def_level, int16_t* rep_level) {
- if (level_offset_ == levels_buffered_) {
- levels_buffered_ =
- static_cast<int>(typed_reader_->ReadBatch(static_cast<int>(batch_size_),
- def_levels_.data(), rep_levels_.data(), values_, &values_buffered_));
-
- value_offset_ = 0;
- level_offset_ = 0;
- if (!levels_buffered_) { return false; }
- }
- *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
- *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
- level_offset_++;
- return true;
- }
-
- bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
- if (level_offset_ == levels_buffered_) {
- if (!HasNext()) {
- // Out of data pages
- return false;
- }
- }
-
- NextLevels(def_level, rep_level);
- *is_null = *def_level < descr()->max_definition_level();
-
- if (*is_null) { return true; }
-
- if (value_offset_ == values_buffered_) {
- throw ParquetException("Value was non-null, but has not been buffered");
- }
- *val = values_[value_offset_++];
- return true;
- }
-
- // Returns true if there is a next value
- bool NextValue(T* val, bool* is_null) {
- if (level_offset_ == levels_buffered_) {
- if (!HasNext()) {
- // Out of data pages
- return false;
- }
- }
-
- // Out of values
- int16_t def_level = -1;
- int16_t rep_level = -1;
- NextLevels(&def_level, &rep_level);
- *is_null = def_level < descr()->max_definition_level();
-
- if (*is_null) { return true; }
-
- if (value_offset_ == values_buffered_) {
- throw ParquetException("Value was non-null, but has not been buffered");
- }
- *val = values_[value_offset_++];
- return true;
- }
-
- virtual void PrintNext(std::ostream& out, int width) {
- T val;
- bool is_null = false;
- char buffer[25];
-
- if (!NextValue(&val, &is_null)) { throw ParquetException("No more values buffered"); }
-
- if (is_null) {
- std::string null_fmt = format_fwf<ByteArrayType>(width);
- snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
- } else {
- FormatValue(&val, buffer, sizeof(buffer), width);
- }
- out << buffer;
- }
-
- private:
- // The ownership of this object is expressed through the reader_ variable in the base
- TypedColumnReader<DType>* typed_reader_;
-
- inline void FormatValue(void* val, char* buffer, int bufsize, int width);
-
- T* values_;
-};
-
-template <typename DType>
-inline void TypedScanner<DType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
- std::string fmt = format_fwf<DType>(width);
- snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
-}
-
-template <>
-inline void TypedScanner<Int96Type>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
- std::string fmt = format_fwf<Int96Type>(width);
- std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
- snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-template <>
-inline void TypedScanner<ByteArrayType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
- std::string fmt = format_fwf<ByteArrayType>(width);
- std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
- snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-template <>
-inline void TypedScanner<FLBAType>::FormatValue(
- void* val, char* buffer, int bufsize, int width) {
- std::string fmt = format_fwf<FLBAType>(width);
- std::string result = FixedLenByteArrayToString(
- *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
- snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-typedef TypedScanner<BooleanType> BoolScanner;
-typedef TypedScanner<Int32Type> Int32Scanner;
-typedef TypedScanner<Int64Type> Int64Scanner;
-typedef TypedScanner<Int96Type> Int96Scanner;
-typedef TypedScanner<FloatType> FloatScanner;
-typedef TypedScanner<DoubleType> DoubleScanner;
-typedef TypedScanner<ByteArrayType> ByteArrayScanner;
-typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_SCANNER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
deleted file mode 100644
index e656f81..0000000
--- a/src/parquet/column/statistics-test.cc
+++ /dev/null
@@ -1,358 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <algorithm>
-#include <array>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/column/statistics.h"
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-#include "parquet/column/writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-using arrow::default_memory_pool;
-using arrow::MemoryPool;
-
-namespace parquet {
-
-using schema::NodePtr;
-using schema::PrimitiveNode;
-using schema::GroupNode;
-
-namespace test {
-
-template <typename TestType>
-class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
- public:
- using T = typename TestType::c_type;
- using TypedStats = TypedRowGroupStatistics<TestType>;
-
- std::vector<T> GetDeepCopy(
- const std::vector<T>&); // allocates new memory for FLBA/ByteArray
-
- T* GetValuesPointer(std::vector<T>&);
- void DeepFree(std::vector<T>&);
-
- void TestMinMaxEncode() {
- this->GenerateData(1000);
-
- TypedStats statistics1(this->schema_.Column(0));
- statistics1.Update(this->values_ptr_, this->values_.size(), 0);
- std::string encoded_min = statistics1.EncodeMin();
- std::string encoded_max = statistics1.EncodeMax();
-
- TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
- this->values_.size(), 0, 0, true);
-
- TypedStats statistics3(this->schema_.Column(0));
- std::vector<uint8_t> valid_bits(
- BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
- statistics3.UpdateSpaced(
- this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
- std::string encoded_min_spaced = statistics3.EncodeMin();
- std::string encoded_max_spaced = statistics3.EncodeMax();
-
- ASSERT_EQ(encoded_min, statistics2.EncodeMin());
- ASSERT_EQ(encoded_max, statistics2.EncodeMax());
- ASSERT_EQ(statistics1.min(), statistics2.min());
- ASSERT_EQ(statistics1.max(), statistics2.max());
- ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
- ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
- ASSERT_EQ(statistics3.min(), statistics2.min());
- ASSERT_EQ(statistics3.max(), statistics2.max());
- }
-
- void TestReset() {
- this->GenerateData(1000);
-
- TypedStats statistics(this->schema_.Column(0));
- statistics.Update(this->values_ptr_, this->values_.size(), 0);
- ASSERT_EQ(this->values_.size(), statistics.num_values());
-
- statistics.Reset();
- ASSERT_EQ(0, statistics.null_count());
- ASSERT_EQ(0, statistics.num_values());
- ASSERT_EQ("", statistics.EncodeMin());
- ASSERT_EQ("", statistics.EncodeMax());
- }
-
- void TestMerge() {
- int num_null[2];
- random_numbers(2, 42, 0, 100, num_null);
-
- TypedStats statistics1(this->schema_.Column(0));
- this->GenerateData(1000);
- statistics1.Update(
- this->values_ptr_, this->values_.size() - num_null[0], num_null[0]);
-
- TypedStats statistics2(this->schema_.Column(0));
- this->GenerateData(1000);
- statistics2.Update(
- this->values_ptr_, this->values_.size() - num_null[1], num_null[1]);
-
- TypedStats total(this->schema_.Column(0));
- total.Merge(statistics1);
- total.Merge(statistics2);
-
- ASSERT_EQ(num_null[0] + num_null[1], total.null_count());
- ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values());
- ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min()));
- ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max()));
- }
-
- void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
- this->GenerateData(num_values);
-
- // compute statistics for the whole batch
- TypedStats expected_stats(this->schema_.Column(0));
- expected_stats.Update(this->values_ptr_, num_values - null_count, null_count);
-
- auto sink = std::make_shared<InMemoryOutputStream>();
- auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
- std::shared_ptr<WriterProperties> writer_properties =
- WriterProperties::Builder().enable_statistics("column")->build();
- auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
- auto row_group_writer = file_writer->AppendRowGroup(num_values);
- auto column_writer =
- static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
-
- // simulate the case when data comes from multiple buffers,
- // in which case special care is necessary for FLBA/ByteArray types
- for (int i = 0; i < 2; i++) {
- int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2;
- int64_t batch_null_count = i ? null_count : 0;
- DCHECK(null_count <= num_values); // avoid too much headache
- std::vector<int16_t> definition_levels(batch_null_count, 0);
- definition_levels.insert(
- definition_levels.end(), batch_num_values - batch_null_count, 1);
- auto beg = this->values_.begin() + i * num_values / 2;
- auto end = beg + batch_num_values;
- std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end));
- T* batch_values_ptr = GetValuesPointer(batch);
- column_writer->WriteBatch(
- batch_num_values, definition_levels.data(), nullptr, batch_values_ptr);
- DeepFree(batch);
- }
- column_writer->Close();
- row_group_writer->Close();
- file_writer->Close();
-
- auto buffer = sink->GetBuffer();
- auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
- auto file_reader = ParquetFileReader::Open(source);
- auto rg_reader = file_reader->RowGroup(0);
- auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
- if (!column_chunk->is_stats_set()) return;
- std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
- // check values after serialization + deserialization
- ASSERT_EQ(null_count, stats->null_count());
- ASSERT_EQ(num_values - null_count, stats->num_values());
- ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin());
- ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax());
- }
-};
-
-template <typename TestType>
-typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer(
- std::vector<typename TestType::c_type>& values) {
- return values.data();
-}
-
-template <>
-bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) {
- static std::vector<uint8_t> bool_buffer;
- bool_buffer.clear();
- bool_buffer.resize(values.size());
- std::copy(values.begin(), values.end(), bool_buffer.begin());
- return reinterpret_cast<bool*>(bool_buffer.data());
-}
-
-template <typename TestType>
-typename std::vector<typename TestType::c_type>
-TestRowGroupStatistics<TestType>::GetDeepCopy(
- const std::vector<typename TestType::c_type>& values) {
- return values;
-}
-
-template <>
-std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
- const std::vector<FLBA>& values) {
- std::vector<FLBA> copy;
- MemoryPool* pool = ::arrow::default_memory_pool();
- for (const FLBA& flba : values) {
- uint8_t* ptr;
- PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
- memcpy(ptr, flba.ptr, FLBA_LENGTH);
- copy.emplace_back(ptr);
- }
- return copy;
-}
-
-template <>
-std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
- const std::vector<ByteArray>& values) {
- std::vector<ByteArray> copy;
- MemoryPool* pool = default_memory_pool();
- for (const ByteArray& ba : values) {
- uint8_t* ptr;
- PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
- memcpy(ptr, ba.ptr, ba.len);
- copy.emplace_back(ba.len, ptr);
- }
- return copy;
-}
-
-template <typename TestType>
-void TestRowGroupStatistics<TestType>::DeepFree(
- std::vector<typename TestType::c_type>& values) {}
-
-template <>
-void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
- MemoryPool* pool = default_memory_pool();
- for (FLBA& flba : values) {
- auto ptr = const_cast<uint8_t*>(flba.ptr);
- memset(ptr, 0, FLBA_LENGTH);
- pool->Free(ptr, FLBA_LENGTH);
- }
-}
-
-template <>
-void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
- MemoryPool* pool = default_memory_pool();
- for (ByteArray& ba : values) {
- auto ptr = const_cast<uint8_t*>(ba.ptr);
- memset(ptr, 0, ba.len);
- pool->Free(ptr, ba.len);
- }
-}
-
-template <>
-void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() {
- this->GenerateData(1000);
- // Test that we encode min max strings correctly
- TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0));
- statistics1.Update(this->values_ptr_, this->values_.size(), 0);
- std::string encoded_min = statistics1.EncodeMin();
- std::string encoded_max = statistics1.EncodeMax();
-
- // encoded is same as unencoded
- ASSERT_EQ(encoded_min,
- std::string((const char*)statistics1.min().ptr, statistics1.min().len));
- ASSERT_EQ(encoded_max,
- std::string((const char*)statistics1.max().ptr, statistics1.max().len));
-
- TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min,
- encoded_max, this->values_.size(), 0, 0, true);
-
- ASSERT_EQ(encoded_min, statistics2.EncodeMin());
- ASSERT_EQ(encoded_max, statistics2.EncodeMax());
- ASSERT_EQ(statistics1.min(), statistics2.min());
- ASSERT_EQ(statistics1.max(), statistics2.max());
-}
-
-using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
- ByteArrayType, FLBAType, BooleanType>;
-
-TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
-
-TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) {
- this->SetUpSchema(Repetition::REQUIRED);
- this->TestMinMaxEncode();
-}
-
-TYPED_TEST(TestRowGroupStatistics, Reset) {
- this->SetUpSchema(Repetition::OPTIONAL);
- this->TestReset();
-}
-
-TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) {
- this->SetUpSchema(Repetition::OPTIONAL);
- this->TestFullRoundtrip(100, 31);
- this->TestFullRoundtrip(1000, 415);
- this->TestFullRoundtrip(10000, 926);
-}
-
-template <typename TestType>
-class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {};
-
-using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>;
-
-TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes);
-
-TYPED_TEST(TestNumericRowGroupStatistics, Merge) {
- this->SetUpSchema(Repetition::OPTIONAL);
- this->TestMerge();
-}
-
-TEST(CorruptStatistics, Basics) {
- ApplicationVersion version("parquet-mr version 1.8.0");
- SchemaDescriptor schema;
- schema::NodePtr node;
- std::vector<schema::NodePtr> fields;
- // Test Physical Types
- fields.push_back(schema::PrimitiveNode::Make(
- "col1", Repetition::OPTIONAL, Type::INT32, LogicalType::NONE));
- fields.push_back(schema::PrimitiveNode::Make(
- "col2", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::NONE));
- // Test Logical Types
- fields.push_back(schema::PrimitiveNode::Make(
- "col3", Repetition::OPTIONAL, Type::INT32, LogicalType::DATE));
- fields.push_back(schema::PrimitiveNode::Make(
- "col4", Repetition::OPTIONAL, Type::INT32, LogicalType::UINT_32));
- fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
- fields.push_back(schema::PrimitiveNode::Make(
- "col6", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8));
- node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
- schema.Init(node);
-
- format::ColumnChunk col_chunk;
- col_chunk.meta_data.__isset.statistics = true;
- auto column_chunk1 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
- ASSERT_TRUE(column_chunk1->is_stats_set());
- auto column_chunk2 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
- ASSERT_FALSE(column_chunk2->is_stats_set());
- auto column_chunk3 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
- ASSERT_TRUE(column_chunk3->is_stats_set());
- auto column_chunk4 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
- ASSERT_FALSE(column_chunk4->is_stats_set());
- auto column_chunk5 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
- ASSERT_FALSE(column_chunk5->is_stats_set());
- auto column_chunk6 = ColumnChunkMetaData::Make(
- reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
- ASSERT_FALSE(column_chunk6->is_stats_set());
-}
-
-} // namespace test
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
deleted file mode 100644
index 961a2af..0000000
--- a/src/parquet/column/statistics.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <algorithm>
-#include <cstring>
-
-#include "parquet/column/statistics.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/exception.h"
-#include "parquet/util/comparison.h"
-#include "parquet/util/memory.h"
-
-using arrow::default_memory_pool;
-using arrow::MemoryPool;
-
-namespace parquet {
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
- const ColumnDescriptor* schema, MemoryPool* pool)
- : pool_(pool),
- min_buffer_(AllocateBuffer(pool_, 0)),
- max_buffer_(AllocateBuffer(pool_, 0)) {
- SetDescr(schema);
- Reset();
-}
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
- const typename DType::c_type& max, int64_t num_values, int64_t null_count,
- int64_t distinct_count)
- : pool_(default_memory_pool()),
- min_buffer_(AllocateBuffer(pool_, 0)),
- max_buffer_(AllocateBuffer(pool_, 0)) {
- IncrementNumValues(num_values);
- IncrementNullCount(null_count);
- IncrementDistinctCount(distinct_count);
-
- Copy(min, &min_, min_buffer_.get());
- Copy(max, &max_, max_buffer_.get());
- has_min_max_ = true;
-}
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
- const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
- int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool)
- : pool_(pool),
- min_buffer_(AllocateBuffer(pool_, 0)),
- max_buffer_(AllocateBuffer(pool_, 0)) {
- IncrementNumValues(num_values);
- IncrementNullCount(null_count);
- IncrementDistinctCount(distinct_count);
-
- SetDescr(schema);
-
- if (!encoded_min.empty()) { PlainDecode(encoded_min, &min_); }
- if (!encoded_max.empty()) { PlainDecode(encoded_max, &max_); }
- has_min_max_ = has_min_max;
-}
-
-template <typename DType>
-bool TypedRowGroupStatistics<DType>::HasMinMax() const {
- return has_min_max_;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Reset() {
- ResetCounts();
- has_min_max_ = false;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Update(
- const T* values, int64_t num_not_null, int64_t num_null) {
- DCHECK(num_not_null >= 0);
- DCHECK(num_null >= 0);
-
- IncrementNullCount(num_null);
- IncrementNumValues(num_not_null);
- // TODO: support distinct count?
- if (num_not_null == 0) return;
-
- Compare<T> compare(descr_);
- auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
- if (!has_min_max_) {
- has_min_max_ = true;
- Copy(*batch_minmax.first, &min_, min_buffer_.get());
- Copy(*batch_minmax.second, &max_, max_buffer_.get());
- } else {
- Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
- Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
- }
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
- const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
- int64_t num_null) {
- DCHECK(num_not_null >= 0);
- DCHECK(num_null >= 0);
-
- IncrementNullCount(num_null);
- IncrementNumValues(num_not_null);
- // TODO: support distinct count?
- if (num_not_null == 0) return;
-
- Compare<T> compare(descr_);
- INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
- // Find first valid entry and use that for min/max
- // As (num_not_null != 0) there must be one
- int64_t length = num_null + num_not_null;
- int64_t i = 0;
- for (; i < length; i++) {
- if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; }
- READ_NEXT_BITSET(valid_bits);
- }
- T min = values[i];
- T max = values[i];
- for (; i < length; i++) {
- if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
- if (compare(values[i], min)) {
- min = values[i];
- } else if (compare(max, values[i])) {
- max = values[i];
- }
- }
- READ_NEXT_BITSET(valid_bits);
- }
- if (!has_min_max_) {
- has_min_max_ = true;
- Copy(min, &min_, min_buffer_.get());
- Copy(max, &max_, max_buffer_.get());
- } else {
- Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
- Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
- }
-}
-
-template <typename DType>
-const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
- return min_;
-}
-
-template <typename DType>
-const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const {
- return max_;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) {
- this->MergeCounts(other);
-
- if (!other.HasMinMax()) return;
-
- if (!has_min_max_) {
- Copy(other.min_, &this->min_, min_buffer_.get());
- Copy(other.max_, &this->max_, max_buffer_.get());
- has_min_max_ = true;
- return;
- }
-
- Compare<T> compare(descr_);
- Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
- Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
-}
-
-template <typename DType>
-std::string TypedRowGroupStatistics<DType>::EncodeMin() {
- std::string s;
- if (HasMinMax()) this->PlainEncode(min_, &s);
- return s;
-}
-
-template <typename DType>
-std::string TypedRowGroupStatistics<DType>::EncodeMax() {
- std::string s;
- if (HasMinMax()) this->PlainEncode(max_, &s);
- return s;
-}
-
-template <typename DType>
-EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
- EncodedStatistics s;
- if (HasMinMax()) {
- s.set_min(this->EncodeMin());
- s.set_max(this->EncodeMax());
- }
- s.set_null_count(this->null_count());
- return s;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
- PlainEncoder<DType> encoder(descr(), pool_);
- encoder.Put(&src, 1);
- auto buffer = encoder.FlushValues();
- auto ptr = reinterpret_cast<const char*>(buffer->data());
- dst->assign(ptr, buffer->size());
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) {
- PlainDecoder<DType> decoder(descr());
- decoder.SetData(
- 1, reinterpret_cast<const uint8_t*>(src.c_str()), static_cast<int>(src.size()));
- decoder.Decode(dst, 1);
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) {
- dst->assign(reinterpret_cast<const char*>(src.ptr), src.len);
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) {
- dst->len = static_cast<uint32_t>(src.size());
- dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
-}
-
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>;
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
deleted file mode 100644
index c6a2487..0000000
--- a/src/parquet/column/statistics.h
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_STATISTICS_H
-#define PARQUET_COLUMN_STATISTICS_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class PARQUET_EXPORT EncodedStatistics {
- std::shared_ptr<std::string> max_, min_;
-
- public:
- EncodedStatistics()
- : max_(std::make_shared<std::string>()), min_(std::make_shared<std::string>()) {}
-
- const std::string& max() const { return *max_; }
- const std::string& min() const { return *min_; }
-
- int64_t null_count = 0;
- int64_t distinct_count = 0;
-
- bool has_min = false;
- bool has_max = false;
- bool has_null_count = false;
- bool has_distinct_count = false;
-
- inline bool is_set() const {
- return has_min || has_max || has_null_count || has_distinct_count;
- }
-
- inline EncodedStatistics& set_max(const std::string& value) {
- *max_ = value;
- has_max = true;
- return *this;
- }
-
- inline EncodedStatistics& set_min(const std::string& value) {
- *min_ = value;
- has_min = true;
- return *this;
- }
-
- inline EncodedStatistics& set_null_count(int64_t value) {
- null_count = value;
- has_null_count = true;
- return *this;
- }
-
- inline EncodedStatistics& set_distinct_count(int64_t value) {
- distinct_count = value;
- has_distinct_count = true;
- return *this;
- }
-};
-
-template <typename DType>
-class PARQUET_EXPORT TypedRowGroupStatistics;
-
-class PARQUET_EXPORT RowGroupStatistics
- : public std::enable_shared_from_this<RowGroupStatistics> {
- public:
- int64_t null_count() const { return statistics_.null_count; }
- int64_t distinct_count() const { return statistics_.distinct_count; }
- int64_t num_values() const { return num_values_; }
-
- virtual bool HasMinMax() const = 0;
- virtual void Reset() = 0;
-
- // Plain-encoded minimum value
- virtual std::string EncodeMin() = 0;
-
- // Plain-encoded maximum value
- virtual std::string EncodeMax() = 0;
-
- virtual EncodedStatistics Encode() = 0;
-
- virtual ~RowGroupStatistics() {}
-
- Type::type physical_type() const { return descr_->physical_type(); }
-
- protected:
- const ColumnDescriptor* descr() const { return descr_; }
- void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
-
- void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
-
- void IncrementNumValues(int64_t n) { num_values_ += n; }
-
- void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; }
-
- void MergeCounts(const RowGroupStatistics& other) {
- this->statistics_.null_count += other.statistics_.null_count;
- this->statistics_.distinct_count += other.statistics_.distinct_count;
- this->num_values_ += other.num_values_;
- }
-
- void ResetCounts() {
- this->statistics_.null_count = 0;
- this->statistics_.distinct_count = 0;
- this->num_values_ = 0;
- }
-
- const ColumnDescriptor* descr_ = nullptr;
- int64_t num_values_ = 0;
- EncodedStatistics statistics_;
-};
-
-template <typename DType>
-class TypedRowGroupStatistics : public RowGroupStatistics {
- public:
- using T = typename DType::c_type;
-
- TypedRowGroupStatistics(const ColumnDescriptor* schema,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values,
- int64_t null_count, int64_t distinct_count);
-
- TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min,
- const std::string& encoded_max, int64_t num_values, int64_t null_count,
- int64_t distinct_count, bool has_min_max,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- bool HasMinMax() const override;
- void Reset() override;
- void Merge(const TypedRowGroupStatistics<DType>& other);
-
- void Update(const T* values, int64_t num_not_null, int64_t num_null);
- void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
- int64_t num_not_null, int64_t num_null);
-
- const T& min() const;
- const T& max() const;
-
- std::string EncodeMin() override;
- std::string EncodeMax() override;
- EncodedStatistics Encode() override;
-
- private:
- bool has_min_max_ = false;
- T min_;
- T max_;
- ::arrow::MemoryPool* pool_;
-
- void PlainEncode(const T& src, std::string* dst);
- void PlainDecode(const std::string& src, T* dst);
- void Copy(const T& src, T* dst, PoolBuffer* buffer);
-
- std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
-};
-
-template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
- *dst = src;
-}
-
-template <>
-inline void TypedRowGroupStatistics<FLBAType>::Copy(
- const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
- if (dst->ptr == src.ptr) return;
- uint32_t len = descr_->type_length();
- PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
- std::memcpy(buffer->mutable_data(), src.ptr, len);
- *dst = FLBA(buffer->data());
-}
-
-template <>
-inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
- const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
- if (dst->ptr == src.ptr) return;
- PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
- std::memcpy(buffer->mutable_data(), src.ptr, src.len);
- *dst = ByteArray(src.len, buffer->data());
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst);
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst);
-
-typedef TypedRowGroupStatistics<BooleanType> BoolStatistics;
-typedef TypedRowGroupStatistics<Int32Type> Int32Statistics;
-typedef TypedRowGroupStatistics<Int64Type> Int64Statistics;
-typedef TypedRowGroupStatistics<Int96Type> Int96Statistics;
-typedef TypedRowGroupStatistics<FloatType> FloatStatistics;
-typedef TypedRowGroupStatistics<DoubleType> DoubleStatistics;
-typedef TypedRowGroupStatistics<ByteArrayType> ByteArrayStatistics;
-typedef TypedRowGroupStatistics<FLBAType> FLBAStatistics;
-
-#if defined(__GNUC__) && !defined(__clang__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wattributes"
-#endif
-
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<BooleanType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int32Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int64Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int96Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FloatType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<DoubleType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<ByteArrayType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FLBAType>;
-
-#if defined(__GNUC__) && !defined(__clang__)
-#pragma GCC diagnostic pop
-#endif
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_STATISTICS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-specialization.h b/src/parquet/column/test-specialization.h
deleted file mode 100644
index 07767c0..0000000
--- a/src/parquet/column/test-specialization.h
+++ /dev/null
@@ -1,172 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
-#define PARQUET_COLUMN_TEST_SPECIALIZATION_H
-
-#include <algorithm>
-#include <limits>
-#include <sstream>
-#include <string>
-#include <vector>
-
-#include "parquet/column/test-util.h"
-
-namespace parquet {
-
-namespace test {
-
-template <>
-void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
- values = flip_coins(num_values, 0);
-}
-
-template <>
-void InitValues<ByteArray>(
- int num_values, vector<ByteArray>& values, vector<uint8_t>& buffer) {
- int max_byte_array_len = 12;
- int num_bytes = max_byte_array_len + sizeof(uint32_t);
- size_t nbytes = num_values * num_bytes;
- buffer.resize(nbytes);
- random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
-}
-
-template <>
-void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
- size_t nbytes = num_values * FLBA_LENGTH;
- buffer.resize(nbytes);
- random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
-}
-
-template <>
-void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
- random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
- std::numeric_limits<int32_t>::max(), values.data());
-}
-
-inline std::string TestColumnName(int i) {
- std::stringstream col_name;
- col_name << "column_" << i;
- return col_name.str();
-}
-
-// This class lives here because of its dependency on the InitValues specializations.
-template <typename TestType>
-class PrimitiveTypedTest : public ::testing::Test {
- public:
- typedef typename TestType::c_type T;
-
- void SetUpSchema(Repetition::type repetition, int num_columns = 1) {
- std::vector<schema::NodePtr> fields;
-
- for (int i = 0; i < num_columns; ++i) {
- std::string name = TestColumnName(i);
- fields.push_back(schema::PrimitiveNode::Make(
- name, repetition, TestType::type_num, LogicalType::NONE, FLBA_LENGTH));
- }
- node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
- schema_.Init(node_);
- }
-
- void GenerateData(int64_t num_values);
- void SetupValuesOut(int64_t num_values);
- void SyncValuesOut();
-
- protected:
- schema::NodePtr node_;
- SchemaDescriptor schema_;
-
- // Input buffers
- std::vector<T> values_;
-
- std::vector<int16_t> def_levels_;
-
- std::vector<uint8_t> buffer_;
- // Pointer to the values, needed as we cannot use vector<bool>::data()
- T* values_ptr_;
- std::vector<uint8_t> bool_buffer_;
-
- // Output buffers
- std::vector<T> values_out_;
- std::vector<uint8_t> bool_buffer_out_;
- T* values_out_ptr_;
-};
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
- std::vector<uint8_t>::const_iterator source_iterator = bool_buffer_out_.begin();
- std::vector<T>::iterator destination_iterator = values_out_.begin();
- while (source_iterator != bool_buffer_out_.end()) {
- *destination_iterator++ = *source_iterator++ != 0;
- }
-}
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
- values_out_.clear();
- values_out_.resize(num_values);
- values_out_ptr_ = values_out_.data();
-}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
- values_out_.clear();
- values_out_.resize(num_values);
-
- bool_buffer_out_.clear();
- bool_buffer_out_.resize(num_values);
- // Write once to all values so we can copy it without getting Valgrind errors
- // about uninitialised values.
- std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
- values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
-}
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
- def_levels_.resize(num_values);
- values_.resize(num_values);
-
- InitValues<T>(static_cast<int>(num_values), values_, buffer_);
- values_ptr_ = values_.data();
-
- std::fill(def_levels_.begin(), def_levels_.end(), 1);
-}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
- def_levels_.resize(num_values);
- values_.resize(num_values);
-
- InitValues<T>(static_cast<int>(num_values), values_, buffer_);
- bool_buffer_.resize(num_values);
- std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
- values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
-
- std::fill(def_levels_.begin(), def_levels_.end(), 1);
-}
-} // namespace test
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_TEST_SPECIALIZATION_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
deleted file mode 100644
index c133734..0000000
--- a/src/parquet/column/test-util.h
+++ /dev/null
@@ -1,429 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_TEST_UTIL_H
-#define PARQUET_COLUMN_TEST_UTIL_H
-
-#include <algorithm>
-#include <limits>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-#include "parquet/column/levels.h"
-#include "parquet/column/page.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/test-common.h"
-
-using std::vector;
-using std::shared_ptr;
-
-namespace parquet {
-
-static int FLBA_LENGTH = 12;
-
-bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
- return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
-}
-
-namespace test {
-
-template <typename T>
-static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) {
- random_numbers(num_values, 0, std::numeric_limits<T>::min(),
- std::numeric_limits<T>::max(), values.data());
-}
-
-template <typename T>
-static void InitDictValues(
- int num_values, int num_dicts, vector<T>& values, vector<uint8_t>& buffer) {
- int repeat_factor = num_values / num_dicts;
- InitValues<T>(num_dicts, values, buffer);
- // add some repeated values
- for (int j = 1; j < repeat_factor; ++j) {
- for (int i = 0; i < num_dicts; ++i) {
- std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
- }
- }
- // computed only dict_per_page * repeat_factor - 1 values < num_values
- // compute remaining
- for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
- std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
- }
-}
-
-class MockPageReader : public PageReader {
- public:
- explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
- : pages_(pages), page_index_(0) {}
-
- // Implement the PageReader interface
- virtual shared_ptr<Page> NextPage() {
- if (page_index_ == static_cast<int>(pages_.size())) {
- // EOS to consumer
- return shared_ptr<Page>(nullptr);
- }
- return pages_[page_index_++];
- }
-
- private:
- vector<shared_ptr<Page>> pages_;
- int page_index_;
-};
-
-// TODO(wesm): this is only used for testing for now. Refactor to form part of
-// primary file write path
-template <typename Type>
-class DataPageBuilder {
- public:
- typedef typename Type::c_type T;
-
- // This class writes data and metadata to the passed inputs
- explicit DataPageBuilder(InMemoryOutputStream* sink)
- : sink_(sink),
- num_values_(0),
- encoding_(Encoding::PLAIN),
- definition_level_encoding_(Encoding::RLE),
- repetition_level_encoding_(Encoding::RLE),
- have_def_levels_(false),
- have_rep_levels_(false),
- have_values_(false) {}
-
- void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
- Encoding::type encoding = Encoding::RLE) {
- AppendLevels(levels, max_level, encoding);
-
- num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
- definition_level_encoding_ = encoding;
- have_def_levels_ = true;
- }
-
- void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
- Encoding::type encoding = Encoding::RLE) {
- AppendLevels(levels, max_level, encoding);
-
- num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
- repetition_level_encoding_ = encoding;
- have_rep_levels_ = true;
- }
-
- void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
- Encoding::type encoding = Encoding::PLAIN) {
- PlainEncoder<Type> encoder(d);
- encoder.Put(&values[0], static_cast<int>(values.size()));
- std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
- sink_->Write(values_sink->data(), values_sink->size());
-
- num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
- encoding_ = encoding;
- have_values_ = true;
- }
-
- int32_t num_values() const { return num_values_; }
-
- Encoding::type encoding() const { return encoding_; }
-
- Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
-
- Encoding::type def_level_encoding() const { return definition_level_encoding_; }
-
- private:
- InMemoryOutputStream* sink_;
-
- int32_t num_values_;
- Encoding::type encoding_;
- Encoding::type definition_level_encoding_;
- Encoding::type repetition_level_encoding_;
-
- bool have_def_levels_;
- bool have_rep_levels_;
- bool have_values_;
-
- // Used internally for both repetition and definition levels
- void AppendLevels(
- const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding) {
- if (encoding != Encoding::RLE) {
- ParquetException::NYI("only rle encoding currently implemented");
- }
-
- // TODO: compute a more precise maximum size for the encoded levels
- vector<uint8_t> encode_buffer(levels.size() * 2);
-
- // We encode into separate memory from the output stream because the
- // RLE-encoded bytes have to be preceded in the stream by their absolute
- // size.
- LevelEncoder encoder;
- encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
- encode_buffer.data(), static_cast<int>(encode_buffer.size()));
-
- encoder.Encode(static_cast<int>(levels.size()), levels.data());
-
- int32_t rle_bytes = encoder.len();
- sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
- sink_->Write(encode_buffer.data(), rle_bytes);
- }
-};
-
-template <>
-void DataPageBuilder<BooleanType>::AppendValues(
- const ColumnDescriptor* d, const vector<bool>& values, Encoding::type encoding) {
- if (encoding != Encoding::PLAIN) {
- ParquetException::NYI("only plain encoding currently implemented");
- }
- PlainEncoder<BooleanType> encoder(d);
- encoder.Put(values, static_cast<int>(values.size()));
- std::shared_ptr<Buffer> buffer = encoder.FlushValues();
- sink_->Write(buffer->data(), buffer->size());
-
- num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
- encoding_ = encoding;
- have_values_ = true;
-}
-
-template <typename Type>
-static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor* d,
- const vector<typename Type::c_type>& values, int num_vals, Encoding::type encoding,
- const uint8_t* indices, int indices_size, const vector<int16_t>& def_levels,
- int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level) {
- int num_values = 0;
-
- InMemoryOutputStream page_stream;
- test::DataPageBuilder<Type> page_builder(&page_stream);
-
- if (!rep_levels.empty()) { page_builder.AppendRepLevels(rep_levels, max_rep_level); }
- if (!def_levels.empty()) { page_builder.AppendDefLevels(def_levels, max_def_level); }
-
- if (encoding == Encoding::PLAIN) {
- page_builder.AppendValues(d, values, encoding);
- num_values = page_builder.num_values();
- } else { // DICTIONARY PAGES
- page_stream.Write(indices, indices_size);
- num_values = std::max(page_builder.num_values(), num_vals);
- }
-
- auto buffer = page_stream.GetBuffer();
-
- return std::make_shared<DataPage>(buffer, num_values, encoding,
- page_builder.def_level_encoding(), page_builder.rep_level_encoding());
-}
-
-template <typename TYPE>
-class DictionaryPageBuilder {
- public:
- typedef typename TYPE::c_type TC;
- static constexpr int TN = TYPE::type_num;
-
- // This class writes data and metadata to the passed inputs
- explicit DictionaryPageBuilder(const ColumnDescriptor* d)
- : num_dict_values_(0), have_values_(false) {
- encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
- }
-
- ~DictionaryPageBuilder() { pool_.FreeAll(); }
-
- shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
- int num_values = static_cast<int>(values.size());
- // Dictionary encoding
- encoder_->Put(values.data(), num_values);
- num_dict_values_ = encoder_->num_entries();
- have_values_ = true;
- return encoder_->FlushValues();
- }
-
- shared_ptr<Buffer> WriteDict() {
- std::shared_ptr<PoolBuffer> dict_buffer =
- AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
- encoder_->WriteDict(dict_buffer->mutable_data());
- return dict_buffer;
- }
-
- int32_t num_values() const { return num_dict_values_; }
-
- private:
- ChunkedAllocator pool_;
- shared_ptr<DictEncoder<TYPE>> encoder_;
- int32_t num_dict_values_;
- bool have_values_;
-};
-
-template <>
-DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) {
- ParquetException::NYI("only plain encoding currently implemented for boolean");
-}
-
-template <>
-shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
- ParquetException::NYI("only plain encoding currently implemented for boolean");
- return nullptr;
-}
-
-template <>
-shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
- const vector<TC>& values) {
- ParquetException::NYI("only plain encoding currently implemented for boolean");
- return nullptr;
-}
-
-template <typename Type>
-static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor* d,
- const vector<typename Type::c_type>& values, const vector<int>& values_per_page,
- Encoding::type encoding, vector<shared_ptr<Buffer>>& rle_indices) {
- InMemoryOutputStream page_stream;
- test::DictionaryPageBuilder<Type> page_builder(d);
- int num_pages = static_cast<int>(values_per_page.size());
- int value_start = 0;
-
- for (int i = 0; i < num_pages; i++) {
- rle_indices.push_back(page_builder.AppendValues(
- slice(values, value_start, value_start + values_per_page[i])));
- value_start += values_per_page[i];
- }
-
- auto buffer = page_builder.WriteDict();
-
- return std::make_shared<DictionaryPage>(
- buffer, page_builder.num_values(), Encoding::PLAIN);
-}
-
-// Given def/rep levels and values create multiple dict pages
-template <typename Type>
-static void PaginateDict(const ColumnDescriptor* d,
- const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
- int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
- int num_levels_per_page, const vector<int>& values_per_page,
- vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::RLE_DICTIONARY) {
- int num_pages = static_cast<int>(values_per_page.size());
- vector<shared_ptr<Buffer>> rle_indices;
- shared_ptr<DictionaryPage> dict_page =
- MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices);
- pages.push_back(dict_page);
- int def_level_start = 0;
- int def_level_end = 0;
- int rep_level_start = 0;
- int rep_level_end = 0;
- for (int i = 0; i < num_pages; i++) {
- if (max_def_level > 0) {
- def_level_start = i * num_levels_per_page;
- def_level_end = (i + 1) * num_levels_per_page;
- }
- if (max_rep_level > 0) {
- rep_level_start = i * num_levels_per_page;
- rep_level_end = (i + 1) * num_levels_per_page;
- }
- shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i],
- encoding, rle_indices[i]->data(), static_cast<int>(rle_indices[i]->size()),
- slice(def_levels, def_level_start, def_level_end), max_def_level,
- slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
- pages.push_back(data_page);
- }
-}
-
-// Given def/rep levels and values create multiple plain pages
-template <typename Type>
-static void PaginatePlain(const ColumnDescriptor* d,
- const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
- int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
- int num_levels_per_page, const vector<int>& values_per_page,
- vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
- int num_pages = static_cast<int>(values_per_page.size());
- int def_level_start = 0;
- int def_level_end = 0;
- int rep_level_start = 0;
- int rep_level_end = 0;
- int value_start = 0;
- for (int i = 0; i < num_pages; i++) {
- if (max_def_level > 0) {
- def_level_start = i * num_levels_per_page;
- def_level_end = (i + 1) * num_levels_per_page;
- }
- if (max_rep_level > 0) {
- rep_level_start = i * num_levels_per_page;
- rep_level_end = (i + 1) * num_levels_per_page;
- }
- shared_ptr<DataPage> page = MakeDataPage<Type>(d,
- slice(values, value_start, value_start + values_per_page[i]), values_per_page[i],
- encoding, NULL, 0, slice(def_levels, def_level_start, def_level_end),
- max_def_level, slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
- pages.push_back(page);
- value_start += values_per_page[i];
- }
-}
-
-// Generates pages from randomly generated data
-template <typename Type>
-static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page,
- vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
- vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
- vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
- int num_levels = levels_per_page * num_pages;
- int num_values = 0;
- uint32_t seed = 0;
- int16_t zero = 0;
- int16_t max_def_level = d->max_definition_level();
- int16_t max_rep_level = d->max_repetition_level();
- vector<int> values_per_page(num_pages, levels_per_page);
- // Create definition levels
- if (max_def_level > 0) {
- def_levels.resize(num_levels);
- random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
- for (int p = 0; p < num_pages; p++) {
- int num_values_per_page = 0;
- for (int i = 0; i < levels_per_page; i++) {
- if (def_levels[i + p * levels_per_page] == max_def_level) {
- num_values_per_page++;
- num_values++;
- }
- }
- values_per_page[p] = num_values_per_page;
- }
- } else {
- num_values = num_levels;
- }
- // Create repitition levels
- if (max_rep_level > 0) {
- rep_levels.resize(num_levels);
- random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
- }
- // Create values
- values.resize(num_values);
- if (encoding == Encoding::PLAIN) {
- InitValues<typename Type::c_type>(num_values, values, buffer);
- PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
- levels_per_page, values_per_page, pages);
- } else if (encoding == Encoding::RLE_DICTIONARY ||
- encoding == Encoding::PLAIN_DICTIONARY) {
- // Calls InitValues and repeats the data
- InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
- PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
- levels_per_page, values_per_page, pages);
- }
-
- return num_values;
-}
-
-} // namespace test
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_TEST_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
deleted file mode 100644
index 59f9999..0000000
--- a/src/parquet/column/writer.cc
+++ /dev/null
@@ -1,528 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/writer.h"
-
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/util/logging.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// ColumnWriter
-
-std::shared_ptr<WriterProperties> default_writer_properties() {
- static std::shared_ptr<WriterProperties> default_writer_properties =
- WriterProperties::Builder().build();
- return default_writer_properties;
-}
-
-ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
- Encoding::type encoding, const WriterProperties* properties)
- : metadata_(metadata),
- descr_(metadata->descr()),
- pager_(std::move(pager)),
- expected_rows_(expected_rows),
- has_dictionary_(has_dictionary),
- encoding_(encoding),
- properties_(properties),
- allocator_(properties->memory_pool()),
- pool_(properties->memory_pool()),
- num_buffered_values_(0),
- num_buffered_encoded_values_(0),
- num_rows_(0),
- total_bytes_written_(0),
- closed_(false),
- fallback_(false) {
- definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
- repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
- definition_levels_rle_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- repetition_levels_rle_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- uncompressed_data_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- if (pager_->has_compressor()) {
- compressed_data_ =
- std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
- }
-}
-
-void ColumnWriter::InitSinks() {
- definition_levels_sink_->Clear();
- repetition_levels_sink_->Clear();
-}
-
-void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
- DCHECK(!closed_);
- definition_levels_sink_->Write(
- reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
-}
-
-void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
- DCHECK(!closed_);
- repetition_levels_sink_->Write(
- reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
-}
-
-// return the size of the encoded buffer
-int64_t ColumnWriter::RleEncodeLevels(
- const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
- // TODO: This only works with due to some RLE specifics
- int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
- static_cast<int>(num_buffered_values_)) +
- sizeof(int32_t);
-
- // Use Arrow::Buffer::shrink_to_fit = false
- // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
- PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
-
- level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
- dest_buffer->mutable_data() + sizeof(int32_t),
- static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
- int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
- reinterpret_cast<const int16_t*>(src_buffer.data()));
- DCHECK_EQ(encoded, num_buffered_values_);
- reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
- int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
- return encoded_size;
-}
-
-void ColumnWriter::AddDataPage() {
- int64_t definition_levels_rle_size = 0;
- int64_t repetition_levels_rle_size = 0;
-
- std::shared_ptr<Buffer> values = GetValuesBuffer();
-
- if (descr_->max_definition_level() > 0) {
- definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
- definition_levels_rle_.get(), descr_->max_definition_level());
- }
-
- if (descr_->max_repetition_level() > 0) {
- repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
- repetition_levels_rle_.get(), descr_->max_repetition_level());
- }
-
- int64_t uncompressed_size =
- definition_levels_rle_size + repetition_levels_rle_size + values->size();
-
- // Use Arrow::Buffer::shrink_to_fit = false
- // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
- PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
-
- // Concatenate data into a single buffer
- uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
- memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
- uncompressed_ptr += repetition_levels_rle_size;
- memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
- uncompressed_ptr += definition_levels_rle_size;
- memcpy(uncompressed_ptr, values->data(), values->size());
-
- EncodedStatistics page_stats = GetPageStatistics();
- ResetPageStatistics();
-
- std::shared_ptr<Buffer> compressed_data;
- if (pager_->has_compressor()) {
- pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
- compressed_data = compressed_data_;
- } else {
- compressed_data = uncompressed_data_;
- }
-
- // Write the page to OutputStream eagerly if there is no dictionary or
- // if dictionary encoding has fallen back to PLAIN
- if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
- std::shared_ptr<Buffer> compressed_data_copy;
- PARQUET_THROW_NOT_OK(compressed_data->Copy(
- 0, compressed_data->size(), allocator_, &compressed_data_copy));
- CompressedDataPage page(compressed_data_copy,
- static_cast<int32_t>(num_buffered_values_), encoding_, Encoding::RLE,
- Encoding::RLE, uncompressed_size, page_stats);
- data_pages_.push_back(std::move(page));
- } else { // Eagerly write pages
- CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
- encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
- WriteDataPage(page);
- }
-
- // Re-initialize the sinks for next Page.
- InitSinks();
- num_buffered_values_ = 0;
- num_buffered_encoded_values_ = 0;
-}
-
-void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
- total_bytes_written_ += pager_->WriteDataPage(page);
-}
-
-int64_t ColumnWriter::Close() {
- if (!closed_) {
- closed_ = true;
- if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
-
- FlushBufferedDataPages();
-
- EncodedStatistics chunk_statistics = GetChunkStatistics();
- if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
- pager_->Close(has_dictionary_, fallback_);
- }
-
- if (num_rows_ != expected_rows_) {
- std::stringstream ss;
- ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
- << "in the current column chunk";
- throw ParquetException(ss.str());
- }
-
- return total_bytes_written_;
-}
-
-void ColumnWriter::FlushBufferedDataPages() {
- // Write all outstanding data to a new page
- if (num_buffered_values_ > 0) { AddDataPage(); }
- for (size_t i = 0; i < data_pages_.size(); i++) {
- WriteDataPage(data_pages_[i]);
- }
- data_pages_.clear();
-}
-
-// ----------------------------------------------------------------------
-// TypedColumnWriter
-
-template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
- const WriterProperties* properties)
- : ColumnWriter(metadata, std::move(pager), expected_rows,
- (encoding == Encoding::PLAIN_DICTIONARY ||
- encoding == Encoding::RLE_DICTIONARY),
- encoding, properties) {
- switch (encoding) {
- case Encoding::PLAIN:
- current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
- break;
- case Encoding::PLAIN_DICTIONARY:
- case Encoding::RLE_DICTIONARY:
- current_encoder_.reset(
- new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
- break;
- default:
- ParquetException::NYI("Selected encoding is not supported");
- }
-
- if (properties->statistics_enabled(descr_->path())) {
- page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
- chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
- }
-}
-
-// Only one Dictionary Page is written.
-// Fallback to PLAIN if dictionary page limit is reached.
-template <typename Type>
-void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
- auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
- if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
- WriteDictionaryPage();
- // Serialize the buffered Dictionary Indicies
- FlushBufferedDataPages();
- fallback_ = true;
- // Only PLAIN encoding is supported for fallback in V1
- current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
- encoding_ = Encoding::PLAIN;
- }
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::WriteDictionaryPage() {
- auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
- std::shared_ptr<PoolBuffer> buffer =
- AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
- dict_encoder->WriteDict(buffer->mutable_data());
- // TODO Get rid of this deep call
- dict_encoder->mem_pool()->FreeAll();
-
- DictionaryPage page(
- buffer, dict_encoder->num_entries(), properties_->dictionary_index_encoding());
- total_bytes_written_ += pager_->WriteDictionaryPage(page);
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
- EncodedStatistics result;
- if (page_statistics_) result = page_statistics_->Encode();
- return result;
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
- EncodedStatistics result;
- if (chunk_statistics_) result = chunk_statistics_->Encode();
- return result;
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::ResetPageStatistics() {
- if (chunk_statistics_ != nullptr) {
- chunk_statistics_->Merge(*page_statistics_);
- page_statistics_->Reset();
- }
-}
-
-// ----------------------------------------------------------------------
-// Dynamic column writer constructor
-
-std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows,
- const WriterProperties* properties) {
- const ColumnDescriptor* descr = metadata->descr();
- Encoding::type encoding = properties->encoding(descr->path());
- if (properties->dictionary_enabled(descr->path()) &&
- descr->physical_type() != Type::BOOLEAN) {
- encoding = properties->dictionary_page_encoding();
- }
- switch (descr->physical_type()) {
- case Type::BOOLEAN:
- return std::make_shared<BoolWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::INT32:
- return std::make_shared<Int32Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::INT64:
- return std::make_shared<Int64Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::INT96:
- return std::make_shared<Int96Writer>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::FLOAT:
- return std::make_shared<FloatWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::DOUBLE:
- return std::make_shared<DoubleWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayWriter>(
- metadata, std::move(pager), expected_rows, encoding, properties);
- default:
- ParquetException::NYI("type reader not implemented");
- }
- // Unreachable code, but supress compiler warning
- return std::shared_ptr<ColumnWriter>(nullptr);
-}
-
-// ----------------------------------------------------------------------
-// Instantiate templated classes
-
-template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
- int64_t values_to_write = 0;
- // If the field is required and non-repeated, there are no definition levels
- if (descr_->max_definition_level() > 0) {
- for (int64_t i = 0; i < num_values; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
- }
-
- WriteDefinitionLevels(num_values, def_levels);
- } else {
- // Required field, write all values
- values_to_write = num_values;
- }
-
- // Not present for non-repeated fields
- if (descr_->max_repetition_level() > 0) {
- // A row could include more than one value
- // Count the occasions where we start a new row
- for (int64_t i = 0; i < num_values; ++i) {
- if (rep_levels[i] == 0) { num_rows_++; }
- }
-
- WriteRepetitionLevels(num_values, rep_levels);
- } else {
- // Each value is exactly one row
- num_rows_ += static_cast<int>(num_values);
- }
-
- if (num_rows_ > expected_rows_) {
- throw ParquetException("More rows were written in the column chunk than expected");
- }
-
- // PARQUET-780
- if (values_to_write > 0) { DCHECK(nullptr != values) << "Values ptr cannot be NULL"; }
-
- WriteValues(values_to_write, values);
-
- if (page_statistics_ != nullptr) {
- page_statistics_->Update(values, values_to_write, num_values - values_to_write);
- }
-
- num_buffered_values_ += num_values;
- num_buffered_encoded_values_ += values_to_write;
-
- if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
- AddDataPage();
- }
- if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
-
- return values_to_write;
-}
-
-template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
- int64_t values_to_write = 0;
- int64_t spaced_values_to_write = 0;
- // If the field is required and non-repeated, there are no definition levels
- if (descr_->max_definition_level() > 0) {
- // Minimal definition level for which spaced values are written
- int16_t min_spaced_def_level = descr_->max_definition_level();
- if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
- for (int64_t i = 0; i < num_values; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
- if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
- }
-
- WriteDefinitionLevels(num_values, def_levels);
- } else {
- // Required field, write all values
- values_to_write = num_values;
- spaced_values_to_write = num_values;
- }
-
- // Not present for non-repeated fields
- if (descr_->max_repetition_level() > 0) {
- // A row could include more than one value
- // Count the occasions where we start a new row
- for (int64_t i = 0; i < num_values; ++i) {
- if (rep_levels[i] == 0) { num_rows_++; }
- }
-
- WriteRepetitionLevels(num_values, rep_levels);
- } else {
- // Each value is exactly one row
- num_rows_ += static_cast<int>(num_values);
- }
-
- if (num_rows_ > expected_rows_) {
- throw ParquetException("More rows were written in the column chunk than expected");
- }
-
- if (descr_->schema_node()->is_optional()) {
- WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
- } else {
- WriteValues(values_to_write, values);
- }
- *num_spaced_written = spaced_values_to_write;
-
- if (page_statistics_ != nullptr) {
- page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
- num_values - values_to_write);
- }
-
- num_buffered_values_ += num_values;
- num_buffered_encoded_values_ += values_to_write;
-
- if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
- AddDataPage();
- }
- if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
-
- return values_to_write;
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values) {
- // We check for DataPage limits only after we have inserted the values. If a user
- // writes a large number of values, the DataPage size can be much above the limit.
- // The purpose of this chunking is to bound this. Even if a user writes large number
- // of values, the chunking will ensure the AddDataPage() is called at a reasonable
- // pagesize limit
- int64_t write_batch_size = properties_->write_batch_size();
- int num_batches = static_cast<int>(num_values / write_batch_size);
- int64_t num_remaining = num_values % write_batch_size;
- int64_t value_offset = 0;
- for (int round = 0; round < num_batches; round++) {
- int64_t offset = round * write_batch_size;
- int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
- &rep_levels[offset], &values[value_offset]);
- value_offset += num_values;
- }
- // Write the remaining values
- int64_t offset = num_batches * write_batch_size;
- WriteMiniBatch(
- num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
- const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values) {
- // We check for DataPage limits only after we have inserted the values. If a user
- // writes a large number of values, the DataPage size can be much above the limit.
- // The purpose of this chunking is to bound this. Even if a user writes large number
- // of values, the chunking will ensure the AddDataPage() is called at a reasonable
- // pagesize limit
- int64_t write_batch_size = properties_->write_batch_size();
- int num_batches = static_cast<int>(num_values / write_batch_size);
- int64_t num_remaining = num_values % write_batch_size;
- int64_t num_spaced_written = 0;
- int64_t values_offset = 0;
- for (int round = 0; round < num_batches; round++) {
- int64_t offset = round * write_batch_size;
- WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
- valid_bits, valid_bits_offset + values_offset, values + values_offset,
- &num_spaced_written);
- values_offset += num_spaced_written;
- }
- // Write the remaining values
- int64_t offset = num_batches * write_batch_size;
- WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
- valid_bits, valid_bits_offset + values_offset, values + values_offset,
- &num_spaced_written);
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
- current_encoder_->Put(values, static_cast<int>(num_values));
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
- const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
- current_encoder_->PutSpaced(
- values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
-}
-
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
-
-} // namespace parquet