You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:28 UTC
[5/6] parquet-cpp git commit: PARQUET-858: Flatten column directory,
minor code consolidation
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/level-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/level-benchmark.cc b/src/parquet/column/level-benchmark.cc
deleted file mode 100644
index 34c7218..0000000
--- a/src/parquet/column/level-benchmark.cc
+++ /dev/null
@@ -1,78 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "benchmark/benchmark.h"
-
-#include "parquet/column/levels.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-namespace benchmark {
-
-static void BM_RleEncoding(::benchmark::State& state) {
- std::vector<int16_t> levels(state.range(0), 0);
- int64_t n = 0;
- std::generate(
- levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
- int16_t max_level = 1;
- int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
- auto buffer_rle = std::make_shared<PoolBuffer>();
- PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
-
- while (state.KeepRunning()) {
- LevelEncoder level_encoder;
- level_encoder.Init(Encoding::RLE, max_level, levels.size(),
- buffer_rle->mutable_data(), buffer_rle->size());
- level_encoder.Encode(levels.size(), levels.data());
- }
- state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
- state.SetItemsProcessed(state.iterations() * state.range(0));
-}
-
-BENCHMARK(BM_RleEncoding)->RangePair(1024, 65536, 1, 16);
-
-static void BM_RleDecoding(::benchmark::State& state) {
- LevelEncoder level_encoder;
- std::vector<int16_t> levels(state.range(0), 0);
- int64_t n = 0;
- std::generate(
- levels.begin(), levels.end(), [&state, &n] { return (n++ % state.range(1)) == 0; });
- int16_t max_level = 1;
- int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, levels.size());
- auto buffer_rle = std::make_shared<PoolBuffer>();
- PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
- level_encoder.Init(Encoding::RLE, max_level, levels.size(),
- buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
- level_encoder.Encode(levels.size(), levels.data());
- reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder.len();
-
- while (state.KeepRunning()) {
- LevelDecoder level_decoder;
- level_decoder.SetData(Encoding::RLE, max_level, levels.size(), buffer_rle->data());
- level_decoder.Decode(state.range(0), levels.data());
- }
-
- state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int16_t));
- state.SetItemsProcessed(state.iterations() * state.range(0));
-}
-
-BENCHMARK(BM_RleDecoding)->RangePair(1024, 65536, 1, 16);
-
-} // namespace benchmark
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
deleted file mode 100644
index a6284a9..0000000
--- a/src/parquet/column/levels-test.cc
+++ /dev/null
@@ -1,245 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <gtest/gtest.h>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/column/levels.h"
-#include "parquet/types.h"
-
-using std::string;
-
-namespace parquet {
-
-void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
- std::vector<int16_t>& input_levels) {
- // for each repetition count upto max_repeat_factor
- for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
- // repeat count increases by a factor of 2 for every iteration
- int repeat_count = (1 << repeat);
- // generate levels for repetition count upto the maximum level
- int value = 0;
- int bwidth = 0;
- while (value <= max_level) {
- for (int i = 0; i < repeat_count; i++) {
- input_levels.push_back(value);
- }
- value = (2 << bwidth) - 1;
- bwidth++;
- }
- }
-}
-
-void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
- const int16_t* input_levels, std::vector<uint8_t>& bytes) {
- LevelEncoder encoder;
- int levels_count = 0;
- bytes.resize(2 * num_levels);
- ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
- // encode levels
- if (encoding == Encoding::RLE) {
- // leave space to write the rle length value
- encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
- static_cast<int>(bytes.size()));
-
- levels_count = encoder.Encode(num_levels, input_levels);
- (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
- } else {
- encoder.Init(
- encoding, max_level, num_levels, bytes.data(), static_cast<int>(bytes.size()));
- levels_count = encoder.Encode(num_levels, input_levels);
- }
- ASSERT_EQ(num_levels, levels_count);
-}
-
-void VerifyDecodingLevels(Encoding::type encoding, int max_level,
- std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
- LevelDecoder decoder;
- int levels_count = 0;
- std::vector<int16_t> output_levels;
- int num_levels = static_cast<int>(input_levels.size());
-
- output_levels.resize(num_levels);
- ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
-
- // Decode levels and test with multiple decode calls
- decoder.SetData(encoding, max_level, num_levels, bytes.data());
- int decode_count = 4;
- int num_inner_levels = num_levels / decode_count;
- // Try multiple decoding on a single SetData call
- for (int ct = 0; ct < decode_count; ct++) {
- int offset = ct * num_inner_levels;
- levels_count = decoder.Decode(num_inner_levels, output_levels.data());
- ASSERT_EQ(num_inner_levels, levels_count);
- for (int i = 0; i < num_inner_levels; i++) {
- EXPECT_EQ(input_levels[i + offset], output_levels[i]);
- }
- }
- // check the remaining levels
- int num_levels_completed = decode_count * (num_levels / decode_count);
- int num_remaining_levels = num_levels - num_levels_completed;
- if (num_remaining_levels > 0) {
- levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
- ASSERT_EQ(num_remaining_levels, levels_count);
- for (int i = 0; i < num_remaining_levels; i++) {
- EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
- }
- }
- // Test zero Decode values
- ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
-}
-
-void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
- std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
- LevelDecoder decoder;
- int levels_count = 0;
- std::vector<int16_t> output_levels;
-
- // Decode levels and test with multiple SetData calls
- int setdata_count = static_cast<int>(bytes.size());
- int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
- output_levels.resize(num_levels);
- // Try multiple SetData
- for (int ct = 0; ct < setdata_count; ct++) {
- int offset = ct * num_levels;
- ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
- decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
- levels_count = decoder.Decode(num_levels, output_levels.data());
- ASSERT_EQ(num_levels, levels_count);
- for (int i = 0; i < num_levels; i++) {
- EXPECT_EQ(input_levels[i + offset], output_levels[i]);
- }
- }
-}
-
-// Test levels with maximum bit-width from 1 to 8
-// increase the repetition count for each iteration by a factor of 2
-TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
- int min_repeat_factor = 0;
- int max_repeat_factor = 7; // 128
- int max_bit_width = 8;
- std::vector<int16_t> input_levels;
- std::vector<uint8_t> bytes;
- Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
-
- // for each encoding
- for (int encode = 0; encode < 2; encode++) {
- Encoding::type encoding = encodings[encode];
- // BIT_PACKED requires a sequence of atleast 8
- if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
- // for each maximum bit-width
- for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
- // find the maximum level for the current bit_width
- int max_level = (1 << bit_width) - 1;
- // Generate levels
- GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
- EncodeLevels(encoding, max_level, static_cast<int>(input_levels.size()),
- input_levels.data(), bytes);
- VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
- input_levels.clear();
- }
- }
-}
-
-// Test multiple decoder SetData calls
-TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
- int min_repeat_factor = 3;
- int max_repeat_factor = 7; // 128
- int bit_width = 8;
- int max_level = (1 << bit_width) - 1;
- std::vector<int16_t> input_levels;
- std::vector<std::vector<uint8_t>> bytes;
- Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
- GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
- int num_levels = static_cast<int>(input_levels.size());
- int setdata_factor = 8;
- int split_level_size = num_levels / setdata_factor;
- bytes.resize(setdata_factor);
-
- // for each encoding
- for (int encode = 0; encode < 2; encode++) {
- Encoding::type encoding = encodings[encode];
- for (int rf = 0; rf < setdata_factor; rf++) {
- int offset = rf * split_level_size;
- EncodeLevels(encoding, max_level, split_level_size,
- reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
- }
- VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
- }
-}
-
-TEST(TestLevelEncoder, MinimumBufferSize) {
- // PARQUET-676, PARQUET-698
- const int kNumToEncode = 1024;
-
- std::vector<int16_t> levels;
- for (int i = 0; i < kNumToEncode; ++i) {
- if (i % 9 == 0) {
- levels.push_back(0);
- } else {
- levels.push_back(1);
- }
- }
-
- std::vector<uint8_t> output(
- LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
-
- LevelEncoder encoder;
- encoder.Init(
- Encoding::RLE, 1, kNumToEncode, output.data(), static_cast<int>(output.size()));
- int encode_count = encoder.Encode(kNumToEncode, levels.data());
-
- ASSERT_EQ(kNumToEncode, encode_count);
-}
-
-TEST(TestLevelEncoder, MinimumBufferSize2) {
- // PARQUET-708
- // Test the worst case for bit_width=2 consisting of
- // LiteralRun(size=8)
- // RepeatedRun(size=8)
- // LiteralRun(size=8)
- // ...
- const int kNumToEncode = 1024;
-
- std::vector<int16_t> levels;
- for (int i = 0; i < kNumToEncode; ++i) {
- // This forces a literal run of 00000001
- // followed by eight 1s
- if ((i % 16) < 7) {
- levels.push_back(0);
- } else {
- levels.push_back(1);
- }
- }
-
- for (int bit_width = 1; bit_width <= 8; bit_width++) {
- std::vector<uint8_t> output(
- LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
-
- LevelEncoder encoder;
- encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
- static_cast<int>(output.size()));
- int encode_count = encoder.Encode(kNumToEncode, levels.data());
-
- ASSERT_EQ(kNumToEncode, encode_count);
- }
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/levels.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.cc b/src/parquet/column/levels.cc
deleted file mode 100644
index fd25420..0000000
--- a/src/parquet/column/levels.cc
+++ /dev/null
@@ -1,144 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/levels.h"
-
-#include <cstdint>
-
-#include "parquet/util/rle-encoding.h"
-
-namespace parquet {
-
-LevelEncoder::LevelEncoder() {}
-LevelEncoder::~LevelEncoder() {}
-
-void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
- int num_buffered_values, uint8_t* data, int data_size) {
- bit_width_ = BitUtil::Log2(max_level + 1);
- encoding_ = encoding;
- switch (encoding) {
- case Encoding::RLE: {
- rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
- break;
- }
- case Encoding::BIT_PACKED: {
- int num_bytes =
- static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
- bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
- break;
- }
- default:
- throw ParquetException("Unknown encoding type for levels.");
- }
-}
-
-int LevelEncoder::MaxBufferSize(
- Encoding::type encoding, int16_t max_level, int num_buffered_values) {
- int bit_width = BitUtil::Log2(max_level + 1);
- int num_bytes = 0;
- switch (encoding) {
- case Encoding::RLE: {
- // TODO: Due to the way we currently check if the buffer is full enough,
- // we need to have MinBufferSize as head room.
- num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
- RleEncoder::MinBufferSize(bit_width);
- break;
- }
- case Encoding::BIT_PACKED: {
- num_bytes = static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width, 8));
- break;
- }
- default:
- throw ParquetException("Unknown encoding type for levels.");
- }
- return num_bytes;
-}
-
-int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
- int num_encoded = 0;
- if (!rle_encoder_ && !bit_packed_encoder_) {
- throw ParquetException("Level encoders are not initialized.");
- }
-
- if (encoding_ == Encoding::RLE) {
- for (int i = 0; i < batch_size; ++i) {
- if (!rle_encoder_->Put(*(levels + i))) { break; }
- ++num_encoded;
- }
- rle_encoder_->Flush();
- rle_length_ = rle_encoder_->len();
- } else {
- for (int i = 0; i < batch_size; ++i) {
- if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { break; }
- ++num_encoded;
- }
- bit_packed_encoder_->Flush();
- }
- return num_encoded;
-}
-
-LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
-
-LevelDecoder::~LevelDecoder() {}
-
-int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
- int num_buffered_values, const uint8_t* data) {
- int32_t num_bytes = 0;
- encoding_ = encoding;
- num_values_remaining_ = num_buffered_values;
- bit_width_ = BitUtil::Log2(max_level + 1);
- switch (encoding) {
- case Encoding::RLE: {
- num_bytes = *reinterpret_cast<const int32_t*>(data);
- const uint8_t* decoder_data = data + sizeof(int32_t);
- if (!rle_decoder_) {
- rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
- } else {
- rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
- }
- return sizeof(int32_t) + num_bytes;
- }
- case Encoding::BIT_PACKED: {
- num_bytes =
- static_cast<int32_t>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
- if (!bit_packed_decoder_) {
- bit_packed_decoder_.reset(new BitReader(data, num_bytes));
- } else {
- bit_packed_decoder_->Reset(data, num_bytes);
- }
- return num_bytes;
- }
- default:
- throw ParquetException("Unknown encoding type for levels.");
- }
- return -1;
-}
-
-int LevelDecoder::Decode(int batch_size, int16_t* levels) {
- int num_decoded = 0;
-
- int num_values = std::min(num_values_remaining_, batch_size);
- if (encoding_ == Encoding::RLE) {
- num_decoded = rle_decoder_->GetBatch(levels, num_values);
- } else {
- num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
- }
- num_values_remaining_ -= num_decoded;
- return num_decoded;
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/levels.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h
deleted file mode 100644
index 63e325e..0000000
--- a/src/parquet/column/levels.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_LEVELS_H
-#define PARQUET_COLUMN_LEVELS_H
-
-#include <algorithm>
-#include <memory>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-namespace parquet {
-
-class BitReader;
-class BitWriter;
-class RleDecoder;
-class RleEncoder;
-
-class PARQUET_EXPORT LevelEncoder {
- public:
- LevelEncoder();
- ~LevelEncoder();
-
- static int MaxBufferSize(
- Encoding::type encoding, int16_t max_level, int num_buffered_values);
-
- // Initialize the LevelEncoder.
- void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
- uint8_t* data, int data_size);
-
- // Encodes a batch of levels from an array and returns the number of levels encoded
- int Encode(int batch_size, const int16_t* levels);
-
- int32_t len() {
- if (encoding_ != Encoding::RLE) {
- throw ParquetException("Only implemented for RLE encoding");
- }
- return rle_length_;
- }
-
- private:
- int bit_width_;
- int rle_length_;
- Encoding::type encoding_;
- std::unique_ptr<RleEncoder> rle_encoder_;
- std::unique_ptr<BitWriter> bit_packed_encoder_;
-};
-
-class PARQUET_EXPORT LevelDecoder {
- public:
- LevelDecoder();
- ~LevelDecoder();
-
- // Initialize the LevelDecoder state with new data
- // and return the number of bytes consumed
- int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
- const uint8_t* data);
-
- // Decodes a batch of levels into an array and returns the number of levels decoded
- int Decode(int batch_size, int16_t* levels);
-
- private:
- int bit_width_;
- int num_values_remaining_;
- Encoding::type encoding_;
- std::unique_ptr<RleDecoder> rle_decoder_;
- std::unique_ptr<BitReader> bit_packed_decoder_;
-};
-
-} // namespace parquet
-#endif // PARQUET_COLUMN_LEVELS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
deleted file mode 100644
index a3813c5..0000000
--- a/src/parquet/column/page.h
+++ /dev/null
@@ -1,201 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_PAGE_H
-#define PARQUET_COLUMN_PAGE_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-#include "parquet/column/statistics.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-// TODO: Parallel processing is not yet safe because of memory-ownership
-// semantics (the PageReader may or may not own the memory referenced by a
-// page)
-//
-// TODO(wesm): In the future Parquet implementations may store the crc code
-// in format::PageHeader. parquet-mr currently does not, so we also skip it
-// here, both on the read and write path
-class Page {
- public:
- Page(const std::shared_ptr<Buffer>& buffer, PageType::type type)
- : buffer_(buffer), type_(type) {}
-
- PageType::type type() const { return type_; }
-
- std::shared_ptr<Buffer> buffer() const { return buffer_; }
-
- // @returns: a pointer to the page's data
- const uint8_t* data() const { return buffer_->data(); }
-
- // @returns: the total size in bytes of the page's data buffer
- int32_t size() const { return static_cast<int32_t>(buffer_->size()); }
-
- private:
- std::shared_ptr<Buffer> buffer_;
- PageType::type type_;
-};
-
-class DataPage : public Page {
- public:
- DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, Encoding::type definition_level_encoding,
- Encoding::type repetition_level_encoding,
- const EncodedStatistics& statistics = EncodedStatistics())
- : Page(buffer, PageType::DATA_PAGE),
- num_values_(num_values),
- encoding_(encoding),
- definition_level_encoding_(definition_level_encoding),
- repetition_level_encoding_(repetition_level_encoding),
- statistics_(statistics) {}
-
- int32_t num_values() const { return num_values_; }
-
- Encoding::type encoding() const { return encoding_; }
-
- Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }
-
- Encoding::type definition_level_encoding() const { return definition_level_encoding_; }
-
- const EncodedStatistics& statistics() const { return statistics_; }
-
- private:
- int32_t num_values_;
- Encoding::type encoding_;
- Encoding::type definition_level_encoding_;
- Encoding::type repetition_level_encoding_;
- EncodedStatistics statistics_;
-};
-
-class CompressedDataPage : public DataPage {
- public:
- CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, Encoding::type definition_level_encoding,
- Encoding::type repetition_level_encoding, int64_t uncompressed_size,
- const EncodedStatistics& statistics = EncodedStatistics())
- : DataPage(buffer, num_values, encoding, definition_level_encoding,
- repetition_level_encoding, statistics),
- uncompressed_size_(uncompressed_size) {}
-
- int64_t uncompressed_size() const { return uncompressed_size_; }
-
- private:
- int64_t uncompressed_size_;
-};
-
-class DataPageV2 : public Page {
- public:
- DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
- int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length,
- int32_t repetition_levels_byte_length, bool is_compressed = false)
- : Page(buffer, PageType::DATA_PAGE_V2),
- num_values_(num_values),
- num_nulls_(num_nulls),
- num_rows_(num_rows),
- encoding_(encoding),
- definition_levels_byte_length_(definition_levels_byte_length),
- repetition_levels_byte_length_(repetition_levels_byte_length),
- is_compressed_(is_compressed) {}
-
- int32_t num_values() const { return num_values_; }
-
- int32_t num_nulls() const { return num_nulls_; }
-
- int32_t num_rows() const { return num_rows_; }
-
- Encoding::type encoding() const { return encoding_; }
-
- int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }
-
- int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }
-
- bool is_compressed() const { return is_compressed_; }
-
- private:
- int32_t num_values_;
- int32_t num_nulls_;
- int32_t num_rows_;
- Encoding::type encoding_;
- int32_t definition_levels_byte_length_;
- int32_t repetition_levels_byte_length_;
- bool is_compressed_;
-
- // TODO(wesm): format::DataPageHeaderV2.statistics
-};
-
-class DictionaryPage : public Page {
- public:
- DictionaryPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
- Encoding::type encoding, bool is_sorted = false)
- : Page(buffer, PageType::DICTIONARY_PAGE),
- num_values_(num_values),
- encoding_(encoding),
- is_sorted_(is_sorted) {}
-
- int32_t num_values() const { return num_values_; }
-
- Encoding::type encoding() const { return encoding_; }
-
- bool is_sorted() const { return is_sorted_; }
-
- private:
- int32_t num_values_;
- Encoding::type encoding_;
- bool is_sorted_;
-};
-
-// Abstract page iterator interface. This way, we can feed column pages to the
-// ColumnReader through whatever mechanism we choose
-class PageReader {
- public:
- virtual ~PageReader() {}
-
- // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
- // containing new Page otherwise
- virtual std::shared_ptr<Page> NextPage() = 0;
-};
-
-class PageWriter {
- public:
- virtual ~PageWriter() {}
-
- // The Column Writer decides if dictionary encoding is used if set and
- // if the dictionary encoding has fallen back to default encoding on reaching dictionary
- // page limit
- virtual void Close(bool has_dictionary, bool fallback) = 0;
-
- virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
-
- virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
-
- virtual bool has_compressor() = 0;
-
- virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
-};
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_PAGE_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc
deleted file mode 100644
index 07247cf..0000000
--- a/src/parquet/column/properties-test.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "parquet/column/properties.h"
-#include "parquet/file/reader.h"
-
-namespace parquet {
-
-using schema::ColumnPath;
-
-namespace test {
-
-TEST(TestReaderProperties, Basics) {
- ReaderProperties props;
-
- ASSERT_EQ(DEFAULT_BUFFER_SIZE, props.buffer_size());
- ASSERT_EQ(DEFAULT_USE_BUFFERED_STREAM, props.is_buffered_stream_enabled());
-}
-
-TEST(TestWriterProperties, Basics) {
- std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
-
- ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
- ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
- ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
-}
-
-TEST(TestWriterProperties, AdvancedHandling) {
- WriterProperties::Builder builder;
- builder.compression("gzip", Compression::GZIP);
- builder.compression(Compression::SNAPPY);
- builder.encoding(Encoding::DELTA_BINARY_PACKED);
- builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
- std::shared_ptr<WriterProperties> props = builder.build();
-
- ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
- ASSERT_EQ(
- Compression::SNAPPY, props->compression(ColumnPath::FromDotString("delta-length")));
- ASSERT_EQ(
- Encoding::DELTA_BINARY_PACKED, props->encoding(ColumnPath::FromDotString("gzip")));
- ASSERT_EQ(Encoding::DELTA_LENGTH_BYTE_ARRAY,
- props->encoding(ColumnPath::FromDotString("delta-length")));
-}
-
-} // namespace test
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
deleted file mode 100644
index 3ebc3b7..0000000
--- a/src/parquet/column/properties.h
+++ /dev/null
@@ -1,385 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_PROPERTIES_H
-#define PARQUET_COLUMN_PROPERTIES_H
-
-#include <memory>
-#include <string>
-#include <unordered_map>
-
-#include "parquet/exception.h"
-#include "parquet/parquet_version.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-struct ParquetVersion {
- enum type { PARQUET_1_0, PARQUET_2_0 };
-};
-
-static int64_t DEFAULT_BUFFER_SIZE = 0;
-static bool DEFAULT_USE_BUFFERED_STREAM = false;
-
-class PARQUET_EXPORT ReaderProperties {
- public:
- explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : pool_(pool) {
- buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
- buffer_size_ = DEFAULT_BUFFER_SIZE;
- }
-
- ::arrow::MemoryPool* memory_pool() const { return pool_; }
-
- std::unique_ptr<InputStream> GetStream(
- RandomAccessSource* source, int64_t start, int64_t num_bytes) {
- std::unique_ptr<InputStream> stream;
- if (buffered_stream_enabled_) {
- stream.reset(
- new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes));
- } else {
- stream.reset(new InMemoryInputStream(source, start, num_bytes));
- }
- return stream;
- }
-
- bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
-
- void enable_buffered_stream() { buffered_stream_enabled_ = true; }
-
- void disable_buffered_stream() { buffered_stream_enabled_ = false; }
-
- void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
-
- int64_t buffer_size() const { return buffer_size_; }
-
- private:
- ::arrow::MemoryPool* pool_;
- int64_t buffer_size_;
- bool buffered_stream_enabled_;
-};
-
-ReaderProperties PARQUET_EXPORT default_reader_properties();
-
-static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
-static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
-static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
-static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
-static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
-static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
-static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
- ParquetVersion::PARQUET_1_0;
-static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
-static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
-
-class PARQUET_EXPORT ColumnProperties {
- public:
- ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
- Compression::type codec = DEFAULT_COMPRESSION_TYPE,
- bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
- bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
- : encoding(encoding),
- codec(codec),
- dictionary_enabled(dictionary_enabled),
- statistics_enabled(statistics_enabled) {}
-
- Encoding::type encoding;
- Compression::type codec;
- bool dictionary_enabled;
- bool statistics_enabled;
-};
-
-class PARQUET_EXPORT WriterProperties {
- public:
- class Builder {
- public:
- Builder()
- : pool_(::arrow::default_memory_pool()),
- dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
- write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
- pagesize_(DEFAULT_PAGE_SIZE),
- version_(DEFAULT_WRITER_VERSION),
- created_by_(DEFAULT_CREATED_BY) {}
- virtual ~Builder() {}
-
- Builder* memory_pool(::arrow::MemoryPool* pool) {
- pool_ = pool;
- return this;
- }
-
- Builder* enable_dictionary() {
- default_column_properties_.dictionary_enabled = true;
- return this;
- }
-
- Builder* disable_dictionary() {
- default_column_properties_.dictionary_enabled = false;
- return this;
- }
-
- Builder* enable_dictionary(const std::string& path) {
- dictionary_enabled_[path] = true;
- return this;
- }
-
- Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
- return this->enable_dictionary(path->ToDotString());
- }
-
- Builder* disable_dictionary(const std::string& path) {
- dictionary_enabled_[path] = false;
- return this;
- }
-
- Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
- return this->disable_dictionary(path->ToDotString());
- }
-
- Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
- dictionary_pagesize_limit_ = dictionary_psize_limit;
- return this;
- }
-
- Builder* write_batch_size(int64_t write_batch_size) {
- write_batch_size_ = write_batch_size;
- return this;
- }
-
- Builder* data_pagesize(int64_t pg_size) {
- pagesize_ = pg_size;
- return this;
- }
-
- Builder* version(ParquetVersion::type version) {
- version_ = version;
- return this;
- }
-
- Builder* created_by(const std::string& created_by) {
- created_by_ = created_by;
- return this;
- }
-
- /**
- * Define the encoding that is used when we don't utilise dictionary encoding.
- *
- * This either apply if dictionary encoding is disabled or if we fallback
- * as the dictionary grew too large.
- */
- Builder* encoding(Encoding::type encoding_type) {
- if (encoding_type == Encoding::PLAIN_DICTIONARY ||
- encoding_type == Encoding::RLE_DICTIONARY) {
- throw ParquetException("Can't use dictionary encoding as fallback encoding");
- }
-
- default_column_properties_.encoding = encoding_type;
- return this;
- }
-
- /**
- * Define the encoding that is used when we don't utilise dictionary encoding.
- *
- * This either apply if dictionary encoding is disabled or if we fallback
- * as the dictionary grew too large.
- */
- Builder* encoding(const std::string& path, Encoding::type encoding_type) {
- if (encoding_type == Encoding::PLAIN_DICTIONARY ||
- encoding_type == Encoding::RLE_DICTIONARY) {
- throw ParquetException("Can't use dictionary encoding as fallback encoding");
- }
-
- encodings_[path] = encoding_type;
- return this;
- }
-
- /**
- * Define the encoding that is used when we don't utilise dictionary encoding.
- *
- * This either apply if dictionary encoding is disabled or if we fallback
- * as the dictionary grew too large.
- */
- Builder* encoding(
- const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
- return this->encoding(path->ToDotString(), encoding_type);
- }
-
- Builder* compression(Compression::type codec) {
- default_column_properties_.codec = codec;
- return this;
- }
-
- Builder* compression(const std::string& path, Compression::type codec) {
- codecs_[path] = codec;
- return this;
- }
-
- Builder* compression(
- const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
- return this->compression(path->ToDotString(), codec);
- }
-
- Builder* enable_statistics() {
- default_column_properties_.statistics_enabled = true;
- return this;
- }
-
- Builder* disable_statistics() {
- default_column_properties_.statistics_enabled = false;
- return this;
- }
-
- Builder* enable_statistics(const std::string& path) {
- statistics_enabled_[path] = true;
- return this;
- }
-
- Builder* enable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
- return this->enable_statistics(path->ToDotString());
- }
-
- Builder* disable_statistics(const std::string& path) {
- statistics_enabled_[path] = false;
- return this;
- }
-
- Builder* disable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
- return this->disable_statistics(path->ToDotString());
- }
-
- std::shared_ptr<WriterProperties> build() {
- std::unordered_map<std::string, ColumnProperties> column_properties;
- auto get = [&](const std::string& key) -> ColumnProperties& {
- auto it = column_properties.find(key);
- if (it == column_properties.end())
- return column_properties[key] = default_column_properties_;
- else
- return it->second;
- };
-
- for (const auto& item : encodings_)
- get(item.first).encoding = item.second;
- for (const auto& item : codecs_)
- get(item.first).codec = item.second;
- for (const auto& item : dictionary_enabled_)
- get(item.first).dictionary_enabled = item.second;
- for (const auto& item : statistics_enabled_)
- get(item.first).statistics_enabled = item.second;
-
- return std::shared_ptr<WriterProperties>(new WriterProperties(pool_,
- dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_,
- default_column_properties_, column_properties));
- }
-
- private:
- ::arrow::MemoryPool* pool_;
- int64_t dictionary_pagesize_limit_;
- int64_t write_batch_size_;
- int64_t pagesize_;
- ParquetVersion::type version_;
- std::string created_by_;
-
- // Settings used for each column unless overridden in any of the maps below
- ColumnProperties default_column_properties_;
- std::unordered_map<std::string, Encoding::type> encodings_;
- std::unordered_map<std::string, Compression::type> codecs_;
- std::unordered_map<std::string, bool> dictionary_enabled_;
- std::unordered_map<std::string, bool> statistics_enabled_;
- };
-
- inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
-
- inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
-
- inline int64_t write_batch_size() const { return write_batch_size_; }
-
- inline int64_t data_pagesize() const { return pagesize_; }
-
- inline ParquetVersion::type version() const { return parquet_version_; }
-
- inline std::string created_by() const { return parquet_created_by_; }
-
- inline Encoding::type dictionary_index_encoding() const {
- if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
- return Encoding::PLAIN_DICTIONARY;
- } else {
- return Encoding::RLE_DICTIONARY;
- }
- }
-
- inline Encoding::type dictionary_page_encoding() const {
- if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
- return Encoding::PLAIN_DICTIONARY;
- } else {
- return Encoding::PLAIN;
- }
- }
-
- const ColumnProperties& column_properties(
- const std::shared_ptr<schema::ColumnPath>& path) const {
- auto it = column_properties_.find(path->ToDotString());
- if (it != column_properties_.end()) return it->second;
- return default_column_properties_;
- }
-
- Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).encoding;
- }
-
- Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).codec;
- }
-
- bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).dictionary_enabled;
- }
-
- bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
- return column_properties(path).statistics_enabled;
- }
-
- private:
- explicit WriterProperties(::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
- int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version,
- const std::string& created_by, const ColumnProperties& default_column_properties,
- const std::unordered_map<std::string, ColumnProperties>& column_properties)
- : pool_(pool),
- dictionary_pagesize_limit_(dictionary_pagesize_limit),
- write_batch_size_(write_batch_size),
- pagesize_(pagesize),
- parquet_version_(version),
- parquet_created_by_(created_by),
- default_column_properties_(default_column_properties),
- column_properties_(column_properties) {}
-
- ::arrow::MemoryPool* pool_;
- int64_t dictionary_pagesize_limit_;
- int64_t write_batch_size_;
- int64_t pagesize_;
- ParquetVersion::type parquet_version_;
- std::string parquet_created_by_;
- ColumnProperties default_column_properties_;
- std::unordered_map<std::string, ColumnProperties> column_properties_;
-};
-
-std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_PROPERTIES_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
deleted file mode 100644
index bc4e4a0..0000000
--- a/src/parquet/column/reader.cc
+++ /dev/null
@@ -1,238 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/reader.h"
-
-#include <algorithm>
-#include <cstdint>
-#include <memory>
-
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
-#include "parquet/encoding-internal.h"
-
-using arrow::MemoryPool;
-
-namespace parquet {
-
-ReaderProperties default_reader_properties() {
- static ReaderProperties default_reader_properties;
- return default_reader_properties;
-}
-
-ColumnReader::ColumnReader(
- const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool)
- : descr_(descr),
- pager_(std::move(pager)),
- num_buffered_values_(0),
- num_decoded_values_(0),
- pool_(pool) {}
-
-ColumnReader::~ColumnReader() {}
-
-template <typename DType>
-void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
- int encoding = static_cast<int>(page->encoding());
- if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
- page->encoding() == Encoding::PLAIN) {
- encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
- }
-
- auto it = decoders_.find(encoding);
- if (it != decoders_.end()) {
- throw ParquetException("Column cannot have more than one dictionary.");
- }
-
- if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
- page->encoding() == Encoding::PLAIN) {
- PlainDecoder<DType> dictionary(descr_);
- dictionary.SetData(page->num_values(), page->data(), page->size());
-
- // The dictionary is fully decoded during DictionaryDecoder::Init, so the
- // DictionaryPage buffer is no longer required after this step
- //
- // TODO(wesm): investigate whether this all-or-nothing decoding of the
- // dictionary makes sense and whether performance can be improved
-
- auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
- decoder->SetDict(&dictionary);
- decoders_[encoding] = decoder;
- } else {
- ParquetException::NYI("only plain dictionary encoding has been implemented");
- }
-
- current_decoder_ = decoders_[encoding].get();
-}
-
-// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
-// encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
- return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
-}
-
-template <typename DType>
-bool TypedColumnReader<DType>::ReadNewPage() {
- // Loop until we find the next data page.
- const uint8_t* buffer;
-
- while (true) {
- current_page_ = pager_->NextPage();
- if (!current_page_) {
- // EOS
- return false;
- }
-
- if (current_page_->type() == PageType::DICTIONARY_PAGE) {
- ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
- continue;
- } else if (current_page_->type() == PageType::DATA_PAGE) {
- const DataPage* page = static_cast<const DataPage*>(current_page_.get());
-
- // Read a data page.
- num_buffered_values_ = page->num_values();
-
- // Have not decoded any values from the data page yet
- num_decoded_values_ = 0;
-
- buffer = page->data();
-
- // If the data page includes repetition and definition levels, we
- // initialize the level decoder and subtract the encoded level bytes from
- // the page size to determine the number of bytes in the encoded data.
- int64_t data_size = page->size();
-
- // Data page Layout: Repetition Levels - Definition Levels - encoded values.
- // Levels are encoded as rle or bit-packed.
- // Init repetition levels
- if (descr_->max_repetition_level() > 0) {
- int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
- page->repetition_level_encoding(), descr_->max_repetition_level(),
- static_cast<int>(num_buffered_values_), buffer);
- buffer += rep_levels_bytes;
- data_size -= rep_levels_bytes;
- }
- // TODO figure a way to set max_definition_level_ to 0
- // if the initial value is invalid
-
- // Init definition levels
- if (descr_->max_definition_level() > 0) {
- int64_t def_levels_bytes = definition_level_decoder_.SetData(
- page->definition_level_encoding(), descr_->max_definition_level(),
- static_cast<int>(num_buffered_values_), buffer);
- buffer += def_levels_bytes;
- data_size -= def_levels_bytes;
- }
-
- // Get a decoder object for this page or create a new decoder if this is the
- // first page with this encoding.
- Encoding::type encoding = page->encoding();
-
- if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; }
-
- auto it = decoders_.find(static_cast<int>(encoding));
- if (it != decoders_.end()) {
- if (encoding == Encoding::RLE_DICTIONARY) {
- DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
- }
- current_decoder_ = it->second.get();
- } else {
- switch (encoding) {
- case Encoding::PLAIN: {
- std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
- decoders_[static_cast<int>(encoding)] = decoder;
- current_decoder_ = decoder.get();
- break;
- }
- case Encoding::RLE_DICTIONARY:
- throw ParquetException("Dictionary page must be before data page.");
-
- case Encoding::DELTA_BINARY_PACKED:
- case Encoding::DELTA_LENGTH_BYTE_ARRAY:
- case Encoding::DELTA_BYTE_ARRAY:
- ParquetException::NYI("Unsupported encoding");
-
- default:
- throw ParquetException("Unknown encoding type.");
- }
- }
- current_decoder_->SetData(
- static_cast<int>(num_buffered_values_), buffer, static_cast<int>(data_size));
- return true;
- } else {
- // We don't know what this page type is. We're allowed to skip non-data
- // pages.
- continue;
- }
- }
- return true;
-}
-
-// ----------------------------------------------------------------------
-// Batch read APIs
-
-int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
- if (descr_->max_definition_level() == 0) { return 0; }
- return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
-}
-
-int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
- if (descr_->max_repetition_level() == 0) { return 0; }
- return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
-}
-
-// ----------------------------------------------------------------------
-// Dynamic column reader constructor
-
-std::shared_ptr<ColumnReader> ColumnReader::Make(
- const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) {
- switch (descr->physical_type()) {
- case Type::BOOLEAN:
- return std::make_shared<BoolReader>(descr, std::move(pager), pool);
- case Type::INT32:
- return std::make_shared<Int32Reader>(descr, std::move(pager), pool);
- case Type::INT64:
- return std::make_shared<Int64Reader>(descr, std::move(pager), pool);
- case Type::INT96:
- return std::make_shared<Int96Reader>(descr, std::move(pager), pool);
- case Type::FLOAT:
- return std::make_shared<FloatReader>(descr, std::move(pager), pool);
- case Type::DOUBLE:
- return std::make_shared<DoubleReader>(descr, std::move(pager), pool);
- case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool);
- case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool);
- default:
- ParquetException::NYI("type reader not implemented");
- }
- // Unreachable code, but supress compiler warning
- return std::shared_ptr<ColumnReader>(nullptr);
-}
-
-// ----------------------------------------------------------------------
-// Instantiate templated classes
-
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FLBAType>;
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
deleted file mode 100644
index 724773d..0000000
--- a/src/parquet/column/reader.h
+++ /dev/null
@@ -1,453 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_READER_H
-#define PARQUET_COLUMN_READER_H
-
-#include <algorithm>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <unordered_map>
-#include <vector>
-#include <iostream>
-
-#include <arrow/util/bit-util.h>
-
-#include "parquet/column/levels.h"
-#include "parquet/column/page.h"
-#include "parquet/encoding.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class PARQUET_EXPORT ColumnReader {
- public:
- ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
- virtual ~ColumnReader();
-
- static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
- std::unique_ptr<PageReader> pager,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
- // Returns true if there are still values in this column.
- bool HasNext() {
- // Either there is no data page available yet, or the data page has been
- // exhausted
- if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
- if (!ReadNewPage() || num_buffered_values_ == 0) { return false; }
- }
- return true;
- }
-
- Type::type type() const { return descr_->physical_type(); }
-
- const ColumnDescriptor* descr() const { return descr_; }
-
- protected:
- virtual bool ReadNewPage() = 0;
-
- // Read multiple definition levels into preallocated memory
- //
- // Returns the number of decoded definition levels
- int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels);
-
- // Read multiple repetition levels into preallocated memory
- // Returns the number of decoded repetition levels
- int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
-
- const ColumnDescriptor* descr_;
-
- std::unique_ptr<PageReader> pager_;
- std::shared_ptr<Page> current_page_;
-
- // Not set if full schema for this field has no optional or repeated elements
- LevelDecoder definition_level_decoder_;
-
- // Not set for flat schemas.
- LevelDecoder repetition_level_decoder_;
-
- // The total number of values stored in the data page. This is the maximum of
- // the number of encoded definition levels or encoded values. For
- // non-repeated, required columns, this is equal to the number of encoded
- // values. For repeated or optional values, there may be fewer data values
- // than levels, and this tells you how many encoded levels there are in that
- // case.
- int64_t num_buffered_values_;
-
- // The number of values from the current data page that have been decoded
- // into memory
- int64_t num_decoded_values_;
-
- ::arrow::MemoryPool* pool_;
-};
-
-// API to read values from a single column. This is the main client facing API.
-template <typename DType>
-class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
- public:
- typedef typename DType::c_type T;
-
- TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
- ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
- : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
- virtual ~TypedColumnReader() {}
-
- // Read a batch of repetition levels, definition levels, and values from the
- // column.
- //
- // Since null values are not stored in the values, the number of values read
- // may be less than the number of repetition and definition levels. With
- // nested data this is almost certainly true.
- //
- // Set def_levels or rep_levels to nullptr if you want to skip reading them.
- // This is only safe if you know through some other source that there are no
- // undefined values.
- //
- // To fully exhaust a row group, you must read batches until the number of
- // values read reaches the number of stored values according to the metadata.
- //
- // This API is the same for both V1 and V2 of the DataPage
- //
- // @returns: actual number of levels read (see values_read for number of values read)
- int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- T* values, int64_t* values_read);
-
- /// Read a batch of repetition levels, definition levels, and values from the
- /// column and leave spaces for null entries on the lowest level in the values
- /// buffer.
- ///
- /// In comparision to ReadBatch the length of repetition and definition levels
- /// is the same as of the number of values read for max_definition_level == 1.
- /// In the case of max_definition_level > 1, the repetition and definition
- /// levels are larger than the values but the values include the null entries
- /// with definition_level == (max_definition_level - 1).
- ///
- /// To fully exhaust a row group, you must read batches until the number of
- /// values read reaches the number of stored values according to the metadata.
- ///
- /// @param batch_size the number of levels to read
- /// @param[out] def_levels The Parquet definition levels, output has
- /// the length levels_read.
- /// @param[out] rep_levels The Parquet repetition levels, output has
- /// the length levels_read.
- /// @param[out] values The values in the lowest nested level including
- /// spacing for nulls on the lowest levels; output has the length
- /// values_read.
- /// @param[out] valid_bits Memory allocated for a bitmap that indicates if
- /// the row is null or on the maximum definition level. For performance
- /// reasons the underlying buffer should be able to store 1 bit more than
- /// required. If this requires an additional byte, this byte is only read
- /// but never written to.
- /// @param valid_bits_offset The offset in bits of the valid_bits where the
- /// first relevant bit resides.
- /// @param[out] levels_read The number of repetition/definition levels that were read.
- /// @param[out] values_read The number of values read, this includes all
- /// non-null entries as well as all null-entries on the lowest level
- /// (i.e. definition_level == max_definition_level - 1)
- /// @param[out] null_count The number of nulls on the lowest levels.
- /// (i.e. (values_read - null_count) is total number of non-null entries)
- int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
- int64_t* values_read, int64_t* null_count);
-
- // Skip reading levels
- // Returns the number of levels skipped
- int64_t Skip(int64_t num_rows_to_skip);
-
- private:
- typedef Decoder<DType> DecoderType;
-
- // Advance to the next data page
- virtual bool ReadNewPage();
-
- // Read up to batch_size values from the current data page into the
- // pre-allocated memory T*
- //
- // @returns: the number of values read into the out buffer
- int64_t ReadValues(int64_t batch_size, T* out);
-
- // Read up to batch_size values from the current data page into the
- // pre-allocated memory T*, leaving spaces for null entries according
- // to the def_levels.
- //
- // @returns: the number of values read into the out buffer
- int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
- uint8_t* valid_bits, int64_t valid_bits_offset);
-
- // Map of encoding type to the respective decoder object. For example, a
- // column chunk's data pages may include both dictionary-encoded and
- // plain-encoded data.
- std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
-
- void ConfigureDictionary(const DictionaryPage* page);
-
- DecoderType* current_decoder_;
-};
-
-template <typename DType>
-inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
- int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
- return num_decoded;
-}
-
-template <typename DType>
-inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
- int null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
- return current_decoder_->DecodeSpaced(
- out, static_cast<int>(batch_size), null_count, valid_bits, valid_bits_offset);
-}
-
-template <typename DType>
-inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
- int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) {
- // HasNext invokes ReadNewPage
- if (!HasNext()) {
- *values_read = 0;
- return 0;
- }
-
- // TODO(wesm): keep reading data pages until batch_size is reached, or the
- // row group is finished
- batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
-
- int64_t num_def_levels = 0;
- int64_t num_rep_levels = 0;
-
- int64_t values_to_read = 0;
-
- // If the field is required and non-repeated, there are no definition levels
- if (descr_->max_definition_level() > 0 && def_levels) {
- num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
- // TODO(wesm): this tallying of values-to-decode can be performed with better
- // cache-efficiency if fused with the level decoding.
- for (int64_t i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
- }
- } else {
- // Required field, read all values
- values_to_read = batch_size;
- }
-
- // Not present for non-repeated fields
- if (descr_->max_repetition_level() > 0 && rep_levels) {
- num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
- if (def_levels && num_def_levels != num_rep_levels) {
- throw ParquetException("Number of decoded rep / def levels did not match");
- }
- }
-
- *values_read = ReadValues(values_to_read, values);
- int64_t total_values = std::max(num_def_levels, *values_read);
- num_decoded_values_ += total_values;
-
- return total_values;
-}
-
-inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
- int16_t max_definition_level, int16_t max_repetition_level,
- int64_t* values_read, int64_t* null_count,
- uint8_t* valid_bits, int64_t valid_bits_offset) {
- int byte_offset = static_cast<int>(valid_bits_offset) / 8;
- int bit_offset = static_cast<int>(valid_bits_offset) % 8;
- uint8_t bitset = valid_bits[byte_offset];
-
- // TODO(itaiin): As an interim solution we are splitting the code path here
- // between repeated+flat column reads, and non-repeated+nested reads.
- // Those paths need to be merged in the future
- for (int i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == max_definition_level) {
- bitset |= (1 << bit_offset);
- } else if (max_repetition_level > 0) {
- // repetition+flat case
- if (def_levels[i] == (max_definition_level - 1)) {
- bitset &= ~(1 << bit_offset);
- *null_count += 1;
- } else {
- continue;
- }
- } else {
- // non-repeated+nested case
- if (def_levels[i] < max_definition_level) {
- bitset &= ~(1 << bit_offset);
- *null_count += 1;
- } else {
- throw ParquetException("definition level exceeds maximum");
- }
- }
-
- bit_offset++;
- if (bit_offset == 8) {
- bit_offset = 0;
- valid_bits[byte_offset] = bitset;
- byte_offset++;
- // TODO: Except for the last byte, this shouldn't be needed
- bitset = valid_bits[byte_offset];
- }
- }
- if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
- *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
-}
-
-template <typename DType>
-inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
- int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits,
- int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read,
- int64_t* null_count_out) {
- // HasNext invokes ReadNewPage
- if (!HasNext()) {
- *levels_read = 0;
- *values_read = 0;
- *null_count_out = 0;
- return 0;
- }
-
- int64_t total_values;
- // TODO(wesm): keep reading data pages until batch_size is reached, or the
- // row group is finished
- batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
-
- // If the field is required and non-repeated, there are no definition levels
- if (descr_->max_definition_level() > 0) {
- int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
-
- // Not present for non-repeated fields
- if (descr_->max_repetition_level() > 0) {
- int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
- if (num_def_levels != num_rep_levels) {
- throw ParquetException("Number of decoded rep / def levels did not match");
- }
- }
-
- // TODO(itaiin): another code path split to merge when the general case is done
- bool has_spaced_values;
- if (descr_->max_repetition_level() > 0) {
- // repeated+flat case
- has_spaced_values = !descr_->schema_node()->is_required();
- } else {
- // non-repeated+nested case
- // Find if a node forces nulls in the lowest level along the hierarchy
- const schema::Node* node = descr_->schema_node().get();
- has_spaced_values = false;
- while (node) {
- auto parent = node->parent();
- if (node->is_optional()) {
- has_spaced_values = true;
- break;
- }
- node = parent;
- }
- }
-
- int64_t null_count = 0;
- if (!has_spaced_values) {
- int values_to_read = 0;
- for (int64_t i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
- }
- total_values = ReadValues(values_to_read, values);
- for (int64_t i = 0; i < total_values; i++) {
- ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
- }
- *values_read = total_values;
- } else {
- int16_t max_definition_level = descr_->max_definition_level();
- int16_t max_repetition_level = descr_->max_repetition_level();
- DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
- max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
- total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
- valid_bits, valid_bits_offset);
- }
- *levels_read = num_def_levels;
- *null_count_out = null_count;
-
- } else {
- // Required field, read all values
- total_values = ReadValues(batch_size, values);
- for (int64_t i = 0; i < total_values; i++) {
- ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
- }
- *null_count_out = 0;
- *levels_read = total_values;
- }
-
- num_decoded_values_ += *levels_read;
- return total_values;
-}
-
-template <typename DType>
-inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
- int64_t rows_to_skip = num_rows_to_skip;
- while (HasNext() && rows_to_skip > 0) {
- // If the number of rows to skip is more than the number of undecoded values, skip the
- // Page.
- if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
- rows_to_skip -= num_buffered_values_ - num_decoded_values_;
- num_decoded_values_ = num_buffered_values_;
- } else {
- // We need to read this Page
- // Jump to the right offset in the Page
- int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
- int64_t values_read = 0;
-
- std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
- this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
- std::shared_ptr<PoolBuffer> def_levels =
- AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
-
- std::shared_ptr<PoolBuffer> rep_levels =
- AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
-
- do {
- batch_size = std::min(batch_size, rows_to_skip);
- values_read = ReadBatch(static_cast<int>(batch_size),
- reinterpret_cast<int16_t*>(def_levels->mutable_data()),
- reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
- reinterpret_cast<T*>(vals->mutable_data()), &values_read);
- rows_to_skip -= values_read;
- } while (values_read > 0 && rows_to_skip > 0);
- }
- }
- return num_rows_to_skip - rows_to_skip;
-}
-
-typedef TypedColumnReader<BooleanType> BoolReader;
-typedef TypedColumnReader<Int32Type> Int32Reader;
-typedef TypedColumnReader<Int64Type> Int64Reader;
-typedef TypedColumnReader<Int96Type> Int96Reader;
-typedef TypedColumnReader<FloatType> FloatReader;
-typedef TypedColumnReader<DoubleType> DoubleReader;
-typedef TypedColumnReader<ByteArrayType> ByteArrayReader;
-typedef TypedColumnReader<FLBAType> FixedLenByteArrayReader;
-
-extern template class PARQUET_EXPORT TypedColumnReader<BooleanType>;
-extern template class PARQUET_EXPORT TypedColumnReader<Int32Type>;
-extern template class PARQUET_EXPORT TypedColumnReader<Int64Type>;
-extern template class PARQUET_EXPORT TypedColumnReader<Int96Type>;
-extern template class PARQUET_EXPORT TypedColumnReader<FloatType>;
-extern template class PARQUET_EXPORT TypedColumnReader<DoubleType>;
-extern template class PARQUET_EXPORT TypedColumnReader<ByteArrayType>;
-extern template class PARQUET_EXPORT TypedColumnReader<FLBAType>;
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scan-all.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scan-all.cc b/src/parquet/column/scan-all.cc
deleted file mode 100644
index 36a7689..0000000
--- a/src/parquet/column/scan-all.cc
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/scan-all.h"
-
-namespace parquet {
-
-int64_t ScanAllValues(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
- switch (reader->type()) {
- case parquet::Type::BOOLEAN:
- return ScanAll<parquet::BoolReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::INT32:
- return ScanAll<parquet::Int32Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::INT64:
- return ScanAll<parquet::Int64Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::INT96:
- return ScanAll<parquet::Int96Reader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::FLOAT:
- return ScanAll<parquet::FloatReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::DOUBLE:
- return ScanAll<parquet::DoubleReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::BYTE_ARRAY:
- return ScanAll<parquet::ByteArrayReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- case parquet::Type::FIXED_LEN_BYTE_ARRAY:
- return ScanAll<parquet::FixedLenByteArrayReader>(
- batch_size, def_levels, rep_levels, values, values_buffered, reader);
- default:
- parquet::ParquetException::NYI("type reader not implemented");
- }
- // Unreachable code, but supress compiler warning
- return 0;
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scan-all.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scan-all.h b/src/parquet/column/scan-all.h
deleted file mode 100644
index b701c17..0000000
--- a/src/parquet/column/scan-all.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_SCAN_ALL_H
-#define PARQUET_SCAN_ALL_H
-
-#include "parquet/column/reader.h"
-
-namespace parquet {
-
-template <typename RType>
-int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
- uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
- typedef typename RType::T Type;
- auto typed_reader = static_cast<RType*>(reader);
- auto vals = reinterpret_cast<Type*>(&values[0]);
- return typed_reader->ReadBatch(
- batch_size, def_levels, rep_levels, vals, values_buffered);
-}
-
-int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
- int16_t* rep_levels, uint8_t* values, int64_t* values_buffered,
- parquet::ColumnReader* reader);
-
-} // namespace parquet
-
-#endif // PARQUET_SCAN_ALL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
deleted file mode 100644
index 5d137b7..0000000
--- a/src/parquet/column/scanner-test.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <algorithm>
-#include <cstdint>
-#include <cstdlib>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/column/page.h"
-#include "parquet/column/scanner.h"
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/test-common.h"
-
-using std::string;
-using std::vector;
-using std::shared_ptr;
-
-namespace parquet {
-
-using schema::NodePtr;
-
-namespace test {
-
-template <>
-void InitDictValues<bool>(
- int num_values, int dict_per_page, vector<bool>& values, vector<uint8_t>& buffer) {
- // No op for bool
-}
-
-template <typename Type>
-class TestFlatScanner : public ::testing::Test {
- public:
- typedef typename Type::c_type T;
-
- void InitScanner(const ColumnDescriptor* d) {
- std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
- scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
- }
-
- void CheckResults(int batch_size, const ColumnDescriptor* d) {
- TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get());
- T val;
- bool is_null = false;
- int16_t def_level;
- int16_t rep_level;
- int j = 0;
- scanner->SetBatchSize(batch_size);
- for (int i = 0; i < num_levels_; i++) {
- ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j;
- if (!is_null) {
- ASSERT_EQ(values_[j], val) << i << "V" << j;
- j++;
- }
- if (d->max_definition_level() > 0) {
- ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j;
- }
- if (d->max_repetition_level() > 0) {
- ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j;
- }
- }
- ASSERT_EQ(num_values_, j);
- ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null));
- }
-
- void Clear() {
- pages_.clear();
- values_.clear();
- def_levels_.clear();
- rep_levels_.clear();
- }
-
- void Execute(int num_pages, int levels_per_page, int batch_size,
- const ColumnDescriptor* d, Encoding::type encoding) {
- num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
- values_, data_buffer_, pages_, encoding);
- num_levels_ = num_pages * levels_per_page;
- InitScanner(d);
- CheckResults(batch_size, d);
- Clear();
- }
-
- void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
- std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3,
- int length) {
- NodePtr type;
- type = schema::PrimitiveNode::Make(
- "c1", Repetition::REQUIRED, Type::type_num, LogicalType::NONE, length);
- d1.reset(new ColumnDescriptor(type, 0, 0));
- type = schema::PrimitiveNode::Make(
- "c2", Repetition::OPTIONAL, Type::type_num, LogicalType::NONE, length);
- d2.reset(new ColumnDescriptor(type, 4, 0));
- type = schema::PrimitiveNode::Make(
- "c3", Repetition::REPEATED, Type::type_num, LogicalType::NONE, length);
- d3.reset(new ColumnDescriptor(type, 4, 2));
- }
-
- void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
- Encoding::type encoding = Encoding::PLAIN) {
- std::shared_ptr<ColumnDescriptor> d1;
- std::shared_ptr<ColumnDescriptor> d2;
- std::shared_ptr<ColumnDescriptor> d3;
- InitDescriptors(d1, d2, d3, type_length);
- // evaluate REQUIRED pages
- Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
- // evaluate OPTIONAL pages
- Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
- // evaluate REPEATED pages
- Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
- }
-
- protected:
- int num_levels_;
- int num_values_;
- vector<shared_ptr<Page>> pages_;
- std::shared_ptr<Scanner> scanner_;
- vector<T> values_;
- vector<int16_t> def_levels_;
- vector<int16_t> rep_levels_;
- vector<uint8_t> data_buffer_; // For BA and FLBA
-};
-
-static int num_levels_per_page = 100;
-static int num_pages = 20;
-static int batch_size = 32;
-
-typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
- ByteArrayType>
- TestTypes;
-
-using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
-using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;
-
-TYPED_TEST_CASE(TestFlatScanner, TestTypes);
-
-TYPED_TEST(TestFlatScanner, TestPlainScanner) {
- this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN);
-}
-
-TYPED_TEST(TestFlatScanner, TestDictScanner) {
- this->ExecuteAll(
- num_pages, num_levels_per_page, batch_size, 0, Encoding::RLE_DICTIONARY);
-}
-
-TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
- this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0);
-}
-
-TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
- this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH);
-}
-
-TEST_F(TestFLBAFlatScanner, TestDictScanner) {
- this->ExecuteAll(
- num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, Encoding::RLE_DICTIONARY);
-}
-
-TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
- this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
- Encoding::PLAIN_DICTIONARY);
-}
-
-// PARQUET 502
-TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
- const ColumnDescriptor d(type, 0, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
- num_levels_ = 1 * 100;
- InitScanner(&d);
- CheckResults(1, &d);
-}
-
-TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
- const ColumnDescriptor d(type, 4, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
- num_levels_ = 1 * 100;
- InitScanner(&d);
- TypedScanner<FLBAType>* scanner =
- reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
- ASSERT_EQ(10, scanner->descr()->type_precision());
- ASSERT_EQ(2, scanner->descr()->type_scale());
- ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
-}
-
-TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
- NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
- Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
- const ColumnDescriptor d(type, 4, 0);
- num_values_ = MakePages<FLBAType>(
- &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
- num_levels_ = 1 * 100;
- InitScanner(&d);
- TypedScanner<FLBAType>* scanner =
- reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
- scanner->SetBatchSize(batch_size);
- std::stringstream ss_fail;
- for (int i = 0; i < num_levels_; i++) {
- std::stringstream ss;
- scanner->PrintNext(ss, 17);
- std::string result = ss.str();
- ASSERT_LE(17, result.size()) << i;
- }
- ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException);
-}
-
-} // namespace test
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
deleted file mode 100644
index 0295315..0000000
--- a/src/parquet/column/scanner.cc
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/scanner.h"
-
-#include <cstdint>
-#include <memory>
-
-#include "parquet/column/reader.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-namespace parquet {
-
-std::shared_ptr<Scanner> Scanner::Make(
- std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) {
- switch (col_reader->type()) {
- case Type::BOOLEAN:
- return std::make_shared<BoolScanner>(col_reader, batch_size, pool);
- case Type::INT32:
- return std::make_shared<Int32Scanner>(col_reader, batch_size, pool);
- case Type::INT64:
- return std::make_shared<Int64Scanner>(col_reader, batch_size, pool);
- case Type::INT96:
- return std::make_shared<Int96Scanner>(col_reader, batch_size, pool);
- case Type::FLOAT:
- return std::make_shared<FloatScanner>(col_reader, batch_size, pool);
- case Type::DOUBLE:
- return std::make_shared<DoubleScanner>(col_reader, batch_size, pool);
- case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayScanner>(col_reader, batch_size, pool);
- case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size, pool);
- default:
- ParquetException::NYI("type reader not implemented");
- }
- // Unreachable code, but supress compiler warning
- return std::shared_ptr<Scanner>(nullptr);
-}
-
-} // namespace parquet