You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:26 UTC
[3/6] parquet-cpp git commit: PARQUET-858: Flatten column directory,
minor code consolidation
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
deleted file mode 100644
index 407e808..0000000
--- a/src/parquet/column/writer.h
+++ /dev/null
@@ -1,250 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_WRITER_H
-#define PARQUET_COLUMN_WRITER_H
-
-#include <vector>
-
-#include "parquet/column/levels.h"
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
-#include "parquet/encoding.h"
-#include "parquet/file/metadata.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-static constexpr int WRITE_BATCH_SIZE = 1000;
-class PARQUET_EXPORT ColumnWriter {
- public:
- ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
- int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
- const WriterProperties* properties);
-
- static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
- std::unique_ptr<PageWriter>, int64_t expected_rows,
- const WriterProperties* properties);
-
- Type::type type() const { return descr_->physical_type(); }
-
- const ColumnDescriptor* descr() const { return descr_; }
-
- /**
- * Closes the ColumnWriter, commits any buffered values to pages.
- *
- * @return Total size of the column in bytes
- */
- int64_t Close();
-
- protected:
- virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
-
- // Serializes Dictionary Page if enabled
- virtual void WriteDictionaryPage() = 0;
-
- // Checks if the Dictionary Page size limit is reached
- // If the limit is reached, the Dictionary and Data Pages are serialized
- // The encoding is switched to PLAIN
-
- virtual void CheckDictionarySizeLimit() = 0;
-
- // Plain-encoded statistics of the current page
- virtual EncodedStatistics GetPageStatistics() = 0;
-
- // Plain-encoded statistics of the whole chunk
- virtual EncodedStatistics GetChunkStatistics() = 0;
-
- // Merges page statistics into chunk statistics, then resets the values
- virtual void ResetPageStatistics() = 0;
-
- // Adds Data Pages to an in memory buffer in dictionary encoding mode
- // Serializes the Data Pages in other encoding modes
- void AddDataPage();
-
- // Serializes Data Pages
- void WriteDataPage(const CompressedDataPage& page);
-
- // Write multiple definition levels
- void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
-
- // Write multiple repetition levels
- void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
-
- // RLE encode the src_buffer into dest_buffer and return the encoded size
- int64_t RleEncodeLevels(
- const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
-
- // Serialize the buffered Data Pages
- void FlushBufferedDataPages();
-
- ColumnChunkMetaDataBuilder* metadata_;
- const ColumnDescriptor* descr_;
-
- std::unique_ptr<PageWriter> pager_;
-
- // The number of rows that should be written in this column chunk.
- int64_t expected_rows_;
- bool has_dictionary_;
- Encoding::type encoding_;
- const WriterProperties* properties_;
-
- LevelEncoder level_encoder_;
-
- ::arrow::MemoryPool* allocator_;
- ChunkedAllocator pool_;
-
- // The total number of values stored in the data page. This is the maximum of
- // the number of encoded definition levels or encoded values. For
- // non-repeated, required columns, this is equal to the number of encoded
- // values. For repeated or optional values, there may be fewer data values
- // than levels, and this tells you how many encoded levels there are in that
- // case.
- int64_t num_buffered_values_;
-
- // The total number of stored values. For repeated or optional values, this
- // number may be lower than num_buffered_values_.
- int64_t num_buffered_encoded_values_;
-
- // Total number of rows written with this ColumnWriter
- int num_rows_;
-
- // Records the total number of bytes written by the serializer
- int64_t total_bytes_written_;
-
- // Flag to check if the Writer has been closed
- bool closed_;
-
- // Flag to infer if dictionary encoding has fallen back to PLAIN
- bool fallback_;
-
- std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
- std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
-
- std::shared_ptr<ResizableBuffer> definition_levels_rle_;
- std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
-
- std::shared_ptr<ResizableBuffer> uncompressed_data_;
- std::shared_ptr<ResizableBuffer> compressed_data_;
-
- std::vector<CompressedDataPage> data_pages_;
-
- private:
- void InitSinks();
-};
-
-// API to write values to a single column. This is the main client facing API.
-template <typename DType>
-class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
- public:
- typedef typename DType::c_type T;
-
- TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
- std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
- const WriterProperties* properties);
-
- // Write a batch of repetition levels, definition levels, and values to the
- // column.
- void WriteBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
-
- /// Write a batch of repetition levels, definition levels, and values to the
- /// column.
- ///
- /// In comparision to WriteBatch the length of repetition and definition levels
- /// is the same as of the number of values read for max_definition_level == 1.
- /// In the case of max_definition_level > 1, the repetition and definition
- /// levels are larger than the values but the values include the null entries
- /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
- /// in the parameters of this function if the input has the length of num_values or the
- /// _number of rows in the lowest nesting level_.
- ///
- /// In the case that the most inner node in the Parquet is required, the _number of rows
- /// in the lowest nesting level_ is equal to the number of non-null values. If the
- /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
- /// also includes all values with definition_level == (max_definition_level - 1).
- ///
- /// @param num_values number of levels to write.
- /// @param def_levels The Parquet definiton levels, length is num_values
- /// @param rep_levels The Parquet repetition levels, length is num_values
- /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
- /// level. The length is number of rows in the lowest nesting level.
- /// @param valid_bits_offset The offset in bits of the valid_bits where the
- /// first relevant bit resides.
- /// @param values The values in the lowest nested level including
- /// spacing for nulls on the lowest levels; input has the length
- /// of the number of rows on the lowest nesting level.
- void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const T* values);
-
- protected:
- std::shared_ptr<Buffer> GetValuesBuffer() override {
- return current_encoder_->FlushValues();
- }
- void WriteDictionaryPage() override;
- void CheckDictionarySizeLimit() override;
- EncodedStatistics GetPageStatistics() override;
- EncodedStatistics GetChunkStatistics() override;
- void ResetPageStatistics() override;
-
- private:
- int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const T* values);
-
- int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
- const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
- const T* values, int64_t* num_spaced_written);
-
- typedef Encoder<DType> EncoderType;
-
- // Write values to a temporary buffer before they are encoded into pages
- void WriteValues(int64_t num_values, const T* values);
- void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
- int64_t valid_bits_offset, const T* values);
- std::unique_ptr<EncoderType> current_encoder_;
-
- typedef TypedRowGroupStatistics<DType> TypedStats;
- std::unique_ptr<TypedStats> page_statistics_;
- std::unique_ptr<TypedStats> chunk_statistics_;
-};
-
-typedef TypedColumnWriter<BooleanType> BoolWriter;
-typedef TypedColumnWriter<Int32Type> Int32Writer;
-typedef TypedColumnWriter<Int64Type> Int64Writer;
-typedef TypedColumnWriter<Int96Type> Int96Writer;
-typedef TypedColumnWriter<FloatType> FloatWriter;
-typedef TypedColumnWriter<DoubleType> DoubleWriter;
-typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
-typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
-
-extern template class PARQUET_EXPORT TypedColumnWriter<BooleanType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int32Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int64Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int96Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<FloatType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<DoubleType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<ByteArrayType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<FLBAType>;
-
-} // namespace parquet
-
-#endif // PARQUET_COLUMN_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
new file mode 100644
index 0000000..7840612
--- /dev/null
+++ b/src/parquet/column_page.h
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_PAGE_H
+#define PARQUET_COLUMN_PAGE_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+
+// TODO: Parallel processing is not yet safe because of memory-ownership
+// semantics (the PageReader may or may not own the memory referenced by a
+// page)
+//
+// TODO(wesm): In the future Parquet implementations may store the crc code
+// in format::PageHeader. parquet-mr currently does not, so we also skip it
+// here, both on the read and write path
+class Page {
+ public:
+ Page(const std::shared_ptr<Buffer>& buffer, PageType::type type)
+ : buffer_(buffer), type_(type) {}
+
+ PageType::type type() const { return type_; }
+
+ std::shared_ptr<Buffer> buffer() const { return buffer_; }
+
+ // @returns: a pointer to the page's data
+ const uint8_t* data() const { return buffer_->data(); }
+
+ // @returns: the total size in bytes of the page's data buffer
+ int32_t size() const { return static_cast<int32_t>(buffer_->size()); }
+
+ private:
+ std::shared_ptr<Buffer> buffer_;
+ PageType::type type_;
+};
+
+class DataPage : public Page {
+ public:
+ DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding,
+ const EncodedStatistics& statistics = EncodedStatistics())
+ : Page(buffer, PageType::DATA_PAGE),
+ num_values_(num_values),
+ encoding_(encoding),
+ definition_level_encoding_(definition_level_encoding),
+ repetition_level_encoding_(repetition_level_encoding),
+ statistics_(statistics) {}
+
+ int32_t num_values() const { return num_values_; }
+
+ Encoding::type encoding() const { return encoding_; }
+
+ Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }
+
+ Encoding::type definition_level_encoding() const { return definition_level_encoding_; }
+
+ const EncodedStatistics& statistics() const { return statistics_; }
+
+ private:
+ int32_t num_values_;
+ Encoding::type encoding_;
+ Encoding::type definition_level_encoding_;
+ Encoding::type repetition_level_encoding_;
+ EncodedStatistics statistics_;
+};
+
+class CompressedDataPage : public DataPage {
+ public:
+ CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+ Encoding::type encoding, Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding, int64_t uncompressed_size,
+ const EncodedStatistics& statistics = EncodedStatistics())
+ : DataPage(buffer, num_values, encoding, definition_level_encoding,
+ repetition_level_encoding, statistics),
+ uncompressed_size_(uncompressed_size) {}
+
+ int64_t uncompressed_size() const { return uncompressed_size_; }
+
+ private:
+ int64_t uncompressed_size_;
+};
+
+class DataPageV2 : public Page {
+ public:
+ DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
+ int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length,
+ int32_t repetition_levels_byte_length, bool is_compressed = false)
+ : Page(buffer, PageType::DATA_PAGE_V2),
+ num_values_(num_values),
+ num_nulls_(num_nulls),
+ num_rows_(num_rows),
+ encoding_(encoding),
+ definition_levels_byte_length_(definition_levels_byte_length),
+ repetition_levels_byte_length_(repetition_levels_byte_length),
+ is_compressed_(is_compressed) {}
+
+ int32_t num_values() const { return num_values_; }
+
+ int32_t num_nulls() const { return num_nulls_; }
+
+ int32_t num_rows() const { return num_rows_; }
+
+ Encoding::type encoding() const { return encoding_; }
+
+ int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }
+
+ int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }
+
+ bool is_compressed() const { return is_compressed_; }
+
+ private:
+ int32_t num_values_;
+ int32_t num_nulls_;
+ int32_t num_rows_;
+ Encoding::type encoding_;
+ int32_t definition_levels_byte_length_;
+ int32_t repetition_levels_byte_length_;
+ bool is_compressed_;
+
+ // TODO(wesm): format::DataPageHeaderV2.statistics
+};
+
+class DictionaryPage : public Page {
+ public:
+ DictionaryPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+ Encoding::type encoding, bool is_sorted = false)
+ : Page(buffer, PageType::DICTIONARY_PAGE),
+ num_values_(num_values),
+ encoding_(encoding),
+ is_sorted_(is_sorted) {}
+
+ int32_t num_values() const { return num_values_; }
+
+ Encoding::type encoding() const { return encoding_; }
+
+ bool is_sorted() const { return is_sorted_; }
+
+ private:
+ int32_t num_values_;
+ Encoding::type encoding_;
+ bool is_sorted_;
+};
+
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PageReader {
+ public:
+ virtual ~PageReader() {}
+
+ // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+ // containing new Page otherwise
+ virtual std::shared_ptr<Page> NextPage() = 0;
+};
+
+class PageWriter {
+ public:
+ virtual ~PageWriter() {}
+
+ // The Column Writer decides if dictionary encoding is used if set and
+ // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+ // page limit
+ virtual void Close(bool has_dictionary, bool fallback) = 0;
+
+ virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
+
+ virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+ virtual bool has_compressor() = 0;
+
+ virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
+};
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_PAGE_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader-test.cc b/src/parquet/column_reader-test.cc
new file mode 100644
index 0000000..84d1e37
--- /dev/null
+++ b/src/parquet/column_reader-test.cc
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+using schema::NodePtr;
+
+namespace test {
+
+template <typename T>
+static inline bool vector_equal_with_def_levels(const vector<T>& left,
+ const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
+ const vector<T>& right) {
+ size_t i_left = 0;
+ size_t i_right = 0;
+ for (size_t i = 0; i < def_levels.size(); i++) {
+ if (def_levels[i] == max_def_levels) {
+ // Compare
+ if (left[i_left] != right[i_right]) {
+ std::cerr << "index " << i << " left was " << left[i_left] << " right was "
+ << right[i] << std::endl;
+ return false;
+ }
+ i_left++;
+ i_right++;
+ } else if (def_levels[i] == (max_def_levels - 1)) {
+ // Null entry on the lowest nested level
+ i_right++;
+ } else if (def_levels[i] < (max_def_levels - 1)) {
+ // Null entry on a higher nesting level, only supported for non-repeating data
+ if (max_rep_levels == 0) { i_right++; }
+ }
+ }
+
+ return true;
+}
+
+class TestPrimitiveReader : public ::testing::Test {
+ public:
+ void InitReader(const ColumnDescriptor* d) {
+ std::unique_ptr<PageReader> pager_;
+ pager_.reset(new test::MockPageReader(pages_));
+ reader_ = ColumnReader::Make(d, std::move(pager_));
+ }
+
+ void CheckResults() {
+ vector<int32_t> vresult(num_values_, -1);
+ vector<int16_t> dresult(num_levels_, -1);
+ vector<int16_t> rresult(num_levels_, -1);
+ int64_t values_read = 0;
+ int total_values_read = 0;
+ int batch_actual = 0;
+
+ Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+ int32_t batch_size = 8;
+ int batch = 0;
+ // This will cover both the cases
+ // 1) batch_size < page_size (multiple ReadBatch from a single page)
+ // 2) batch_size > page_size (BatchRead limits to a single page)
+ do {
+ batch = static_cast<int>(reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
+ &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read));
+ total_values_read += static_cast<int>(values_read);
+ batch_actual += batch;
+ batch_size = std::max(batch_size * 2, 4096);
+ } while (batch > 0);
+
+ ASSERT_EQ(num_levels_, batch_actual);
+ ASSERT_EQ(num_values_, total_values_read);
+ ASSERT_TRUE(vector_equal(values_, vresult));
+ if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); }
+ if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+ // catch improper writes at EOS
+ batch_actual =
+ static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
+ ASSERT_EQ(0, batch_actual);
+ ASSERT_EQ(0, values_read);
+ }
+
+ void CheckResultsSpaced() {
+ vector<int32_t> vresult(num_levels_, -1);
+ vector<int16_t> dresult(num_levels_, -1);
+ vector<int16_t> rresult(num_levels_, -1);
+ vector<uint8_t> valid_bits(num_levels_, 255);
+ int total_values_read = 0;
+ int batch_actual = 0;
+ int levels_actual = 0;
+ int64_t null_count = -1;
+ int64_t levels_read = 0;
+ int64_t values_read;
+
+ Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+ int32_t batch_size = 8;
+ int batch = 0;
+ // This will cover both the cases
+ // 1) batch_size < page_size (multiple ReadBatch from a single page)
+ // 2) batch_size > page_size (BatchRead limits to a single page)
+ do {
+ batch = static_cast<int>(reader->ReadBatchSpaced(batch_size,
+ dresult.data() + levels_actual, rresult.data() + levels_actual,
+ vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
+ &levels_read, &values_read, &null_count));
+ total_values_read += batch - static_cast<int>(null_count);
+ batch_actual += batch;
+ levels_actual += static_cast<int>(levels_read);
+ batch_size = std::max(batch_size * 2, 4096);
+ } while ((batch > 0) || (levels_read > 0));
+
+ ASSERT_EQ(num_levels_, levels_actual);
+ ASSERT_EQ(num_values_, total_values_read);
+ if (max_def_level_ > 0) {
+ ASSERT_TRUE(vector_equal(def_levels_, dresult));
+ ASSERT_TRUE(vector_equal_with_def_levels(
+ values_, dresult, max_def_level_, max_rep_level_, vresult));
+ } else {
+ ASSERT_TRUE(vector_equal(values_, vresult));
+ }
+ if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+ // catch improper writes at EOS
+ batch_actual = static_cast<int>(reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr,
+ valid_bits.data(), 0, &levels_read, &values_read, &null_count));
+ ASSERT_EQ(0, batch_actual);
+ ASSERT_EQ(0, null_count);
+ }
+
+ void Clear() {
+ values_.clear();
+ def_levels_.clear();
+ rep_levels_.clear();
+ pages_.clear();
+ reader_.reset();
+ }
+
+ void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
+ num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+ rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+ num_levels_ = num_pages * levels_per_page;
+ InitReader(d);
+ CheckResults();
+ Clear();
+
+ num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+ rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+ num_levels_ = num_pages * levels_per_page;
+ InitReader(d);
+ CheckResultsSpaced();
+ Clear();
+ }
+
+ void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
+ num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+ rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+ num_levels_ = num_pages * levels_per_page;
+ InitReader(d);
+ CheckResults();
+ Clear();
+
+ num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+ rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+ num_levels_ = num_pages * levels_per_page;
+ InitReader(d);
+ CheckResultsSpaced();
+ Clear();
+ }
+
+ protected:
+ int num_levels_;
+ int num_values_;
+ int16_t max_def_level_;
+ int16_t max_rep_level_;
+ vector<shared_ptr<Page>> pages_;
+ std::shared_ptr<ColumnReader> reader_;
+ vector<int32_t> values_;
+ vector<int16_t> def_levels_;
+ vector<int16_t> rep_levels_;
+ vector<uint8_t> data_buffer_; // For BA and FLBA
+};
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
+ int levels_per_page = 100;
+ int num_pages = 50;
+ max_def_level_ = 0;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Int32("a", Repetition::REQUIRED);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+ ExecutePlain(num_pages, levels_per_page, &descr);
+ ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
+ int levels_per_page = 100;
+ int num_pages = 50;
+ max_def_level_ = 4;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+ ExecutePlain(num_pages, levels_per_page, &descr);
+ ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
+ int levels_per_page = 100;
+ int num_pages = 50;
+ max_def_level_ = 4;
+ max_rep_level_ = 2;
+ NodePtr type = schema::Int32("c", Repetition::REPEATED);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+ ExecutePlain(num_pages, levels_per_page, &descr);
+ ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
+ int levels_per_page = 100;
+ int num_pages = 5;
+ max_def_level_ = 0;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+ MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, Encoding::PLAIN);
+ InitReader(&descr);
+ vector<int32_t> vresult(levels_per_page / 2, -1);
+ vector<int16_t> dresult(levels_per_page / 2, -1);
+ vector<int16_t> rresult(levels_per_page / 2, -1);
+
+ Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+ int64_t values_read = 0;
+
+ // 1) skip_size > page_size (multiple pages skipped)
+ // Skip first 2 pages
+ int64_t levels_skipped = reader->Skip(2 * levels_per_page);
+ ASSERT_EQ(2 * levels_per_page, levels_skipped);
+ // Read half a page
+ reader->ReadBatch(
+ levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+ vector<int32_t> sub_values(values_.begin() + 2 * levels_per_page,
+ values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page)));
+ ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+ // 2) skip_size == page_size (skip across two pages)
+ levels_skipped = reader->Skip(levels_per_page);
+ ASSERT_EQ(levels_per_page, levels_skipped);
+ // Read half a page
+ reader->ReadBatch(
+ levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+ sub_values.clear();
+ sub_values.insert(sub_values.end(),
+ values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)),
+ values_.begin() + 4 * levels_per_page);
+ ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+ // 3) skip_size < page_size (skip limited to a single page)
+ // Skip half a page
+ levels_skipped = reader->Skip(levels_per_page / 2);
+ ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
+ // Read half a page
+ reader->ReadBatch(
+ levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+ sub_values.clear();
+ sub_values.insert(sub_values.end(),
+ values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)),
+ values_.end());
+ ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+ values_.clear();
+ def_levels_.clear();
+ rep_levels_.clear();
+ pages_.clear();
+ reader_.reset();
+}
+
+TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
+ max_def_level_ = 0;
+ max_rep_level_ = 0;
+ NodePtr type = schema::Int32("a", Repetition::REQUIRED);
+ const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+ shared_ptr<PoolBuffer> dummy = std::make_shared<PoolBuffer>();
+
+ shared_ptr<DictionaryPage> dict_page =
+ std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
+ shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+ &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
+ pages_.push_back(dict_page);
+ pages_.push_back(data_page);
+ InitReader(&descr);
+ // Tests Dict : PLAIN, Data : RLE_DICTIONARY
+ ASSERT_NO_THROW(reader_->HasNext());
+ pages_.clear();
+
+ dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
+ data_page = MakeDataPage<Int32Type>(
+ &descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0);
+ pages_.push_back(dict_page);
+ pages_.push_back(data_page);
+ InitReader(&descr);
+ // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
+ ASSERT_NO_THROW(reader_->HasNext());
+ pages_.clear();
+
+ data_page = MakeDataPage<Int32Type>(
+ &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
+ pages_.push_back(data_page);
+ InitReader(&descr);
+ // Tests dictionary page must occur before data page
+ ASSERT_THROW(reader_->HasNext(), ParquetException);
+ pages_.clear();
+
+ dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY);
+ pages_.push_back(dict_page);
+ InitReader(&descr);
+ // Tests only RLE_DICTIONARY is supported
+ ASSERT_THROW(reader_->HasNext(), ParquetException);
+ pages_.clear();
+
+ shared_ptr<DictionaryPage> dict_page1 =
+ std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
+ shared_ptr<DictionaryPage> dict_page2 =
+ std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
+ pages_.push_back(dict_page1);
+ pages_.push_back(dict_page2);
+ InitReader(&descr);
+ // Column cannot have more than one dictionary
+ ASSERT_THROW(reader_->HasNext(), ParquetException);
+ pages_.clear();
+
+ data_page = MakeDataPage<Int32Type>(
+ &descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0);
+ pages_.push_back(data_page);
+ InitReader(&descr);
+ // unsupported encoding
+ ASSERT_THROW(reader_->HasNext(), ParquetException);
+ pages_.clear();
+}
+
+} // namespace test
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
new file mode 100644
index 0000000..f63f6f1
--- /dev/null
+++ b/src/parquet/column_reader.cc
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/properties.h"
+#include "parquet/util/rle-encoding.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+
+LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
+
+LevelDecoder::~LevelDecoder() {}
+
+int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
+ int num_buffered_values, const uint8_t* data) {
+ int32_t num_bytes = 0;
+ encoding_ = encoding;
+ num_values_remaining_ = num_buffered_values;
+ bit_width_ = BitUtil::Log2(max_level + 1);
+ switch (encoding) {
+ case Encoding::RLE: {
+ num_bytes = *reinterpret_cast<const int32_t*>(data);
+ const uint8_t* decoder_data = data + sizeof(int32_t);
+ if (!rle_decoder_) {
+ rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
+ } else {
+ rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
+ }
+ return sizeof(int32_t) + num_bytes;
+ }
+ case Encoding::BIT_PACKED: {
+ num_bytes =
+ static_cast<int32_t>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
+ if (!bit_packed_decoder_) {
+ bit_packed_decoder_.reset(new BitReader(data, num_bytes));
+ } else {
+ bit_packed_decoder_->Reset(data, num_bytes);
+ }
+ return num_bytes;
+ }
+ default:
+ throw ParquetException("Unknown encoding type for levels.");
+ }
+ return -1;
+}
+
+int LevelDecoder::Decode(int batch_size, int16_t* levels) {
+ int num_decoded = 0;
+
+ int num_values = std::min(num_values_remaining_, batch_size);
+ if (encoding_ == Encoding::RLE) {
+ num_decoded = rle_decoder_->GetBatch(levels, num_values);
+ } else {
+ num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
+ }
+ num_values_remaining_ -= num_decoded;
+ return num_decoded;
+}
+
+ReaderProperties default_reader_properties() {
+ static ReaderProperties default_reader_properties;
+ return default_reader_properties;
+}
+
+ColumnReader::ColumnReader(
+ const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool)
+ : descr_(descr),
+ pager_(std::move(pager)),
+ num_buffered_values_(0),
+ num_decoded_values_(0),
+ pool_(pool) {}
+
+ColumnReader::~ColumnReader() {}
+
+template <typename DType>
+void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
+ int encoding = static_cast<int>(page->encoding());
+ if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+ page->encoding() == Encoding::PLAIN) {
+ encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
+ }
+
+ auto it = decoders_.find(encoding);
+ if (it != decoders_.end()) {
+ throw ParquetException("Column cannot have more than one dictionary.");
+ }
+
+ if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+ page->encoding() == Encoding::PLAIN) {
+ PlainDecoder<DType> dictionary(descr_);
+ dictionary.SetData(page->num_values(), page->data(), page->size());
+
+ // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+ // DictionaryPage buffer is no longer required after this step
+ //
+ // TODO(wesm): investigate whether this all-or-nothing decoding of the
+ // dictionary makes sense and whether performance can be improved
+
+ auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
+ decoder->SetDict(&dictionary);
+ decoders_[encoding] = decoder;
+ } else {
+ ParquetException::NYI("only plain dictionary encoding has been implemented");
+ }
+
+ current_decoder_ = decoders_[encoding].get();
+}
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+ return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+template <typename DType>
+bool TypedColumnReader<DType>::ReadNewPage() {
+ // Loop until we find the next data page.
+ const uint8_t* buffer;
+
+ while (true) {
+ current_page_ = pager_->NextPage();
+ if (!current_page_) {
+ // EOS
+ return false;
+ }
+
+ if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+ ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
+ continue;
+ } else if (current_page_->type() == PageType::DATA_PAGE) {
+ const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
+ // Read a data page.
+ num_buffered_values_ = page->num_values();
+
+ // Have not decoded any values from the data page yet
+ num_decoded_values_ = 0;
+
+ buffer = page->data();
+
+ // If the data page includes repetition and definition levels, we
+ // initialize the level decoder and subtract the encoded level bytes from
+ // the page size to determine the number of bytes in the encoded data.
+ int64_t data_size = page->size();
+
+ // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+ // Levels are encoded as rle or bit-packed.
+ // Init repetition levels
+ if (descr_->max_repetition_level() > 0) {
+ int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+ page->repetition_level_encoding(), descr_->max_repetition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ buffer += rep_levels_bytes;
+ data_size -= rep_levels_bytes;
+ }
+ // TODO figure a way to set max_definition_level_ to 0
+ // if the initial value is invalid
+
+ // Init definition levels
+ if (descr_->max_definition_level() > 0) {
+ int64_t def_levels_bytes = definition_level_decoder_.SetData(
+ page->definition_level_encoding(), descr_->max_definition_level(),
+ static_cast<int>(num_buffered_values_), buffer);
+ buffer += def_levels_bytes;
+ data_size -= def_levels_bytes;
+ }
+
+ // Get a decoder object for this page or create a new decoder if this is the
+ // first page with this encoding.
+ Encoding::type encoding = page->encoding();
+
+ if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; }
+
+ auto it = decoders_.find(static_cast<int>(encoding));
+ if (it != decoders_.end()) {
+ if (encoding == Encoding::RLE_DICTIONARY) {
+ DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+ }
+ current_decoder_ = it->second.get();
+ } else {
+ switch (encoding) {
+ case Encoding::PLAIN: {
+ std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
+ decoders_[static_cast<int>(encoding)] = decoder;
+ current_decoder_ = decoder.get();
+ break;
+ }
+ case Encoding::RLE_DICTIONARY:
+ throw ParquetException("Dictionary page must be before data page.");
+
+ case Encoding::DELTA_BINARY_PACKED:
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+ case Encoding::DELTA_BYTE_ARRAY:
+ ParquetException::NYI("Unsupported encoding");
+
+ default:
+ throw ParquetException("Unknown encoding type.");
+ }
+ }
+ current_decoder_->SetData(
+ static_cast<int>(num_buffered_values_), buffer, static_cast<int>(data_size));
+ return true;
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ continue;
+ }
+ }
+ return true;
+}
+
+// ----------------------------------------------------------------------
+// Batch read APIs
+
+int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
+ if (descr_->max_definition_level() == 0) { return 0; }
+ return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+}
+
+int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
+ if (descr_->max_repetition_level() == 0) { return 0; }
+ return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column reader constructor
+
+std::shared_ptr<ColumnReader> ColumnReader::Make(
+ const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) {
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::make_shared<BoolReader>(descr, std::move(pager), pool);
+ case Type::INT32:
+ return std::make_shared<Int32Reader>(descr, std::move(pager), pool);
+ case Type::INT64:
+ return std::make_shared<Int64Reader>(descr, std::move(pager), pool);
+ case Type::INT96:
+ return std::make_shared<Int96Reader>(descr, std::move(pager), pool);
+ case Type::FLOAT:
+ return std::make_shared<FloatReader>(descr, std::move(pager), pool);
+ case Type::DOUBLE:
+ return std::make_shared<DoubleReader>(descr, std::move(pager), pool);
+ case Type::BYTE_ARRAY:
+ return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool);
+ default:
+ ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return std::shared_ptr<ColumnReader>(nullptr);
+}
+
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FLBAType>;
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
new file mode 100644
index 0000000..f4b8b02
--- /dev/null
+++ b/src/parquet/column_reader.h
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_READER_H
+#define PARQUET_COLUMN_READER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <arrow/util/bit-util.h>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class BitReader;
+class RleDecoder;
+
+class PARQUET_EXPORT LevelDecoder {
+ public:
+ LevelDecoder();
+ ~LevelDecoder();
+
+ // Initialize the LevelDecoder state with new data
+ // and return the number of bytes consumed
+ int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
+ const uint8_t* data);
+
+ // Decodes a batch of levels into an array and returns the number of levels decoded
+ int Decode(int batch_size, int16_t* levels);
+
+ private:
+ int bit_width_;
+ int num_values_remaining_;
+ Encoding::type encoding_;
+ std::unique_ptr<RleDecoder> rle_decoder_;
+ std::unique_ptr<BitReader> bit_packed_decoder_;
+};
+
+class PARQUET_EXPORT ColumnReader {
+ public:
+ ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+ virtual ~ColumnReader();
+
+ static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
+ std::unique_ptr<PageReader> pager,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ // Returns true if there are still values in this column.
+ bool HasNext() {
+ // Either there is no data page available yet, or the data page has been
+ // exhausted
+ if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+ if (!ReadNewPage() || num_buffered_values_ == 0) { return false; }
+ }
+ return true;
+ }
+
+ Type::type type() const { return descr_->physical_type(); }
+
+ const ColumnDescriptor* descr() const { return descr_; }
+
+ protected:
+ virtual bool ReadNewPage() = 0;
+
+ // Read multiple definition levels into preallocated memory
+ //
+ // Returns the number of decoded definition levels
+ int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels);
+
+ // Read multiple repetition levels into preallocated memory
+ // Returns the number of decoded repetition levels
+ int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
+
+ const ColumnDescriptor* descr_;
+
+ std::unique_ptr<PageReader> pager_;
+ std::shared_ptr<Page> current_page_;
+
+ // Not set if full schema for this field has no optional or repeated elements
+ LevelDecoder definition_level_decoder_;
+
+ // Not set for flat schemas.
+ LevelDecoder repetition_level_decoder_;
+
+ // The total number of values stored in the data page. This is the maximum of
+ // the number of encoded definition levels or encoded values. For
+ // non-repeated, required columns, this is equal to the number of encoded
+ // values. For repeated or optional values, there may be fewer data values
+ // than levels, and this tells you how many encoded levels there are in that
+ // case.
+ int64_t num_buffered_values_;
+
+ // The number of values from the current data page that have been decoded
+ // into memory
+ int64_t num_decoded_values_;
+
+ ::arrow::MemoryPool* pool_;
+};
+
+// API to read values from a single column. This is the main client facing API.
+template <typename DType>
+class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
+ public:
+ typedef typename DType::c_type T;
+
+ TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
+ virtual ~TypedColumnReader() {}
+
+ // Read a batch of repetition levels, definition levels, and values from the
+ // column.
+ //
+ // Since null values are not stored in the values, the number of values read
+ // may be less than the number of repetition and definition levels. With
+ // nested data this is almost certainly true.
+ //
+ // Set def_levels or rep_levels to nullptr if you want to skip reading them.
+ // This is only safe if you know through some other source that there are no
+ // undefined values.
+ //
+ // To fully exhaust a row group, you must read batches until the number of
+ // values read reaches the number of stored values according to the metadata.
+ //
+ // This API is the same for both V1 and V2 of the DataPage
+ //
+ // @returns: actual number of levels read (see values_read for number of values read)
+ int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+ T* values, int64_t* values_read);
+
+ /// Read a batch of repetition levels, definition levels, and values from the
+ /// column and leave spaces for null entries on the lowest level in the values
+ /// buffer.
+ ///
+ /// In comparision to ReadBatch the length of repetition and definition levels
+ /// is the same as of the number of values read for max_definition_level == 1.
+ /// In the case of max_definition_level > 1, the repetition and definition
+ /// levels are larger than the values but the values include the null entries
+ /// with definition_level == (max_definition_level - 1).
+ ///
+ /// To fully exhaust a row group, you must read batches until the number of
+ /// values read reaches the number of stored values according to the metadata.
+ ///
+ /// @param batch_size the number of levels to read
+ /// @param[out] def_levels The Parquet definition levels, output has
+ /// the length levels_read.
+ /// @param[out] rep_levels The Parquet repetition levels, output has
+ /// the length levels_read.
+ /// @param[out] values The values in the lowest nested level including
+ /// spacing for nulls on the lowest levels; output has the length
+ /// values_read.
+ /// @param[out] valid_bits Memory allocated for a bitmap that indicates if
+ /// the row is null or on the maximum definition level. For performance
+ /// reasons the underlying buffer should be able to store 1 bit more than
+ /// required. If this requires an additional byte, this byte is only read
+ /// but never written to.
+ /// @param valid_bits_offset The offset in bits of the valid_bits where the
+ /// first relevant bit resides.
+ /// @param[out] levels_read The number of repetition/definition levels that were read.
+ /// @param[out] values_read The number of values read, this includes all
+ /// non-null entries as well as all null-entries on the lowest level
+ /// (i.e. definition_level == max_definition_level - 1)
+ /// @param[out] null_count The number of nulls on the lowest levels.
+ /// (i.e. (values_read - null_count) is total number of non-null entries)
+ int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+ T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
+ int64_t* values_read, int64_t* null_count);
+
+ // Skip reading levels
+ // Returns the number of levels skipped
+ int64_t Skip(int64_t num_rows_to_skip);
+
+ private:
+ typedef Decoder<DType> DecoderType;
+
+ // Advance to the next data page
+ virtual bool ReadNewPage();
+
+ // Read up to batch_size values from the current data page into the
+ // pre-allocated memory T*
+ //
+ // @returns: the number of values read into the out buffer
+ int64_t ReadValues(int64_t batch_size, T* out);
+
+ // Read up to batch_size values from the current data page into the
+ // pre-allocated memory T*, leaving spaces for null entries according
+ // to the def_levels.
+ //
+ // @returns: the number of values read into the out buffer
+ int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+ uint8_t* valid_bits, int64_t valid_bits_offset);
+
+ // Map of encoding type to the respective decoder object. For example, a
+ // column chunk's data pages may include both dictionary-encoded and
+ // plain-encoded data.
+ std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
+
+ void ConfigureDictionary(const DictionaryPage* page);
+
+ DecoderType* current_decoder_;
+};
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
+ int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
+ return num_decoded;
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
+ int null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+ return current_decoder_->DecodeSpaced(
+ out, static_cast<int>(batch_size), null_count, valid_bits, valid_bits_offset);
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
+ int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) {
+ // HasNext invokes ReadNewPage
+ if (!HasNext()) {
+ *values_read = 0;
+ return 0;
+ }
+
+ // TODO(wesm): keep reading data pages until batch_size is reached, or the
+ // row group is finished
+ batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
+
+ int64_t num_def_levels = 0;
+ int64_t num_rep_levels = 0;
+
+ int64_t values_to_read = 0;
+
+ // If the field is required and non-repeated, there are no definition levels
+ if (descr_->max_definition_level() > 0 && def_levels) {
+ num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+ // TODO(wesm): this tallying of values-to-decode can be performed with better
+ // cache-efficiency if fused with the level decoding.
+ for (int64_t i = 0; i < num_def_levels; ++i) {
+ if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+ }
+ } else {
+ // Required field, read all values
+ values_to_read = batch_size;
+ }
+
+ // Not present for non-repeated fields
+ if (descr_->max_repetition_level() > 0 && rep_levels) {
+ num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+ if (def_levels && num_def_levels != num_rep_levels) {
+ throw ParquetException("Number of decoded rep / def levels did not match");
+ }
+ }
+
+ *values_read = ReadValues(values_to_read, values);
+ int64_t total_values = std::max(num_def_levels, *values_read);
+ num_decoded_values_ += total_values;
+
+ return total_values;
+}
+
+inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
+ int16_t max_definition_level, int16_t max_repetition_level, int64_t* values_read,
+ int64_t* null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+ int byte_offset = static_cast<int>(valid_bits_offset) / 8;
+ int bit_offset = static_cast<int>(valid_bits_offset) % 8;
+ uint8_t bitset = valid_bits[byte_offset];
+
+ // TODO(itaiin): As an interim solution we are splitting the code path here
+ // between repeated+flat column reads, and non-repeated+nested reads.
+ // Those paths need to be merged in the future
+ for (int i = 0; i < num_def_levels; ++i) {
+ if (def_levels[i] == max_definition_level) {
+ bitset |= (1 << bit_offset);
+ } else if (max_repetition_level > 0) {
+ // repetition+flat case
+ if (def_levels[i] == (max_definition_level - 1)) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ continue;
+ }
+ } else {
+ // non-repeated+nested case
+ if (def_levels[i] < max_definition_level) {
+ bitset &= ~(1 << bit_offset);
+ *null_count += 1;
+ } else {
+ throw ParquetException("definition level exceeds maximum");
+ }
+ }
+
+ bit_offset++;
+ if (bit_offset == 8) {
+ bit_offset = 0;
+ valid_bits[byte_offset] = bitset;
+ byte_offset++;
+ // TODO: Except for the last byte, this shouldn't be needed
+ bitset = valid_bits[byte_offset];
+ }
+ }
+ if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
+ *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
+ int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits,
+ int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read,
+ int64_t* null_count_out) {
+ // HasNext invokes ReadNewPage
+ if (!HasNext()) {
+ *levels_read = 0;
+ *values_read = 0;
+ *null_count_out = 0;
+ return 0;
+ }
+
+ int64_t total_values;
+ // TODO(wesm): keep reading data pages until batch_size is reached, or the
+ // row group is finished
+ batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
+
+ // If the field is required and non-repeated, there are no definition levels
+ if (descr_->max_definition_level() > 0) {
+ int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+
+ // Not present for non-repeated fields
+ if (descr_->max_repetition_level() > 0) {
+ int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+ if (num_def_levels != num_rep_levels) {
+ throw ParquetException("Number of decoded rep / def levels did not match");
+ }
+ }
+
+ // TODO(itaiin): another code path split to merge when the general case is done
+ bool has_spaced_values;
+ if (descr_->max_repetition_level() > 0) {
+ // repeated+flat case
+ has_spaced_values = !descr_->schema_node()->is_required();
+ } else {
+ // non-repeated+nested case
+ // Find if a node forces nulls in the lowest level along the hierarchy
+ const schema::Node* node = descr_->schema_node().get();
+ has_spaced_values = false;
+ while (node) {
+ auto parent = node->parent();
+ if (node->is_optional()) {
+ has_spaced_values = true;
+ break;
+ }
+ node = parent;
+ }
+ }
+
+ int64_t null_count = 0;
+ if (!has_spaced_values) {
+ int values_to_read = 0;
+ for (int64_t i = 0; i < num_def_levels; ++i) {
+ if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+ }
+ total_values = ReadValues(values_to_read, values);
+ for (int64_t i = 0; i < total_values; i++) {
+ ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+ }
+ *values_read = total_values;
+ } else {
+ int16_t max_definition_level = descr_->max_definition_level();
+ int16_t max_repetition_level = descr_->max_repetition_level();
+ DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+ max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
+ total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
+ valid_bits, valid_bits_offset);
+ }
+ *levels_read = num_def_levels;
+ *null_count_out = null_count;
+
+ } else {
+ // Required field, read all values
+ total_values = ReadValues(batch_size, values);
+ for (int64_t i = 0; i < total_values; i++) {
+ ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+ }
+ *null_count_out = 0;
+ *levels_read = total_values;
+ }
+
+ num_decoded_values_ += *levels_read;
+ return total_values;
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+ int64_t rows_to_skip = num_rows_to_skip;
+ while (HasNext() && rows_to_skip > 0) {
+ // If the number of rows to skip is more than the number of undecoded values, skip the
+ // Page.
+ if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
+ rows_to_skip -= num_buffered_values_ - num_decoded_values_;
+ num_decoded_values_ = num_buffered_values_;
+ } else {
+ // We need to read this Page
+ // Jump to the right offset in the Page
+ int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint
+ int64_t values_read = 0;
+
+ std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
+ this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
+ std::shared_ptr<PoolBuffer> def_levels =
+ AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
+
+ std::shared_ptr<PoolBuffer> rep_levels =
+ AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
+
+ do {
+ batch_size = std::min(batch_size, rows_to_skip);
+ values_read = ReadBatch(static_cast<int>(batch_size),
+ reinterpret_cast<int16_t*>(def_levels->mutable_data()),
+ reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
+ reinterpret_cast<T*>(vals->mutable_data()), &values_read);
+ rows_to_skip -= values_read;
+ } while (values_read > 0 && rows_to_skip > 0);
+ }
+ }
+ return num_rows_to_skip - rows_to_skip;
+}
+
+typedef TypedColumnReader<BooleanType> BoolReader;
+typedef TypedColumnReader<Int32Type> Int32Reader;
+typedef TypedColumnReader<Int64Type> Int64Reader;
+typedef TypedColumnReader<Int96Type> Int96Reader;
+typedef TypedColumnReader<FloatType> FloatReader;
+typedef TypedColumnReader<DoubleType> DoubleReader;
+typedef TypedColumnReader<ByteArrayType> ByteArrayReader;
+typedef TypedColumnReader<FLBAType> FixedLenByteArrayReader;
+
+extern template class PARQUET_EXPORT TypedColumnReader<BooleanType>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int32Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int64Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int96Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<FloatType>;
+extern template class PARQUET_EXPORT TypedColumnReader<DoubleType>;
+extern template class PARQUET_EXPORT TypedColumnReader<ByteArrayType>;
+extern template class PARQUET_EXPORT TypedColumnReader<FLBAType>;
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner-test.cc b/src/parquet/column_scanner-test.cc
new file mode 100644
index 0000000..086722b
--- /dev/null
+++ b/src/parquet/column_scanner-test.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/column_scanner.h"
+#include "parquet/schema.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+using schema::NodePtr;
+
+namespace test {
+
+template <>
+void InitDictValues<bool>(
+ int num_values, int dict_per_page, vector<bool>& values, vector<uint8_t>& buffer) {
+ // No op for bool
+}
+
+template <typename Type>
+class TestFlatScanner : public ::testing::Test {
+ public:
+ typedef typename Type::c_type T;
+
+ void InitScanner(const ColumnDescriptor* d) {
+ std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
+ scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
+ }
+
+ void CheckResults(int batch_size, const ColumnDescriptor* d) {
+ TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get());
+ T val;
+ bool is_null = false;
+ int16_t def_level;
+ int16_t rep_level;
+ int j = 0;
+ scanner->SetBatchSize(batch_size);
+ for (int i = 0; i < num_levels_; i++) {
+ ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j;
+ if (!is_null) {
+ ASSERT_EQ(values_[j], val) << i << "V" << j;
+ j++;
+ }
+ if (d->max_definition_level() > 0) {
+ ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j;
+ }
+ if (d->max_repetition_level() > 0) {
+ ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j;
+ }
+ }
+ ASSERT_EQ(num_values_, j);
+ ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null));
+ }
+
+ void Clear() {
+ pages_.clear();
+ values_.clear();
+ def_levels_.clear();
+ rep_levels_.clear();
+ }
+
+ void Execute(int num_pages, int levels_per_page, int batch_size,
+ const ColumnDescriptor* d, Encoding::type encoding) {
+ num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+ values_, data_buffer_, pages_, encoding);
+ num_levels_ = num_pages * levels_per_page;
+ InitScanner(d);
+ CheckResults(batch_size, d);
+ Clear();
+ }
+
+ void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
+ std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3,
+ int length) {
+ NodePtr type;
+ type = schema::PrimitiveNode::Make(
+ "c1", Repetition::REQUIRED, Type::type_num, LogicalType::NONE, length);
+ d1.reset(new ColumnDescriptor(type, 0, 0));
+ type = schema::PrimitiveNode::Make(
+ "c2", Repetition::OPTIONAL, Type::type_num, LogicalType::NONE, length);
+ d2.reset(new ColumnDescriptor(type, 4, 0));
+ type = schema::PrimitiveNode::Make(
+ "c3", Repetition::REPEATED, Type::type_num, LogicalType::NONE, length);
+ d3.reset(new ColumnDescriptor(type, 4, 2));
+ }
+
+ void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
+ Encoding::type encoding = Encoding::PLAIN) {
+ std::shared_ptr<ColumnDescriptor> d1;
+ std::shared_ptr<ColumnDescriptor> d2;
+ std::shared_ptr<ColumnDescriptor> d3;
+ InitDescriptors(d1, d2, d3, type_length);
+ // evaluate REQUIRED pages
+ Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
+ // evaluate OPTIONAL pages
+ Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
+ // evaluate REPEATED pages
+ Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
+ }
+
+ protected:
+ int num_levels_;
+ int num_values_;
+ vector<shared_ptr<Page>> pages_;
+ std::shared_ptr<Scanner> scanner_;
+ vector<T> values_;
+ vector<int16_t> def_levels_;
+ vector<int16_t> rep_levels_;
+ vector<uint8_t> data_buffer_; // For BA and FLBA
+};
+
+static int num_levels_per_page = 100;
+static int num_pages = 20;
+static int batch_size = 32;
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+ ByteArrayType>
+ TestTypes;
+
+using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
+using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;
+
+TYPED_TEST_CASE(TestFlatScanner, TestTypes);
+
+TYPED_TEST(TestFlatScanner, TestPlainScanner) {
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN);
+}
+
+TYPED_TEST(TestFlatScanner, TestDictScanner) {
+ this->ExecuteAll(
+ num_pages, num_levels_per_page, batch_size, 0, Encoding::RLE_DICTIONARY);
+}
+
+TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0);
+}
+
+TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH);
+}
+
+TEST_F(TestFLBAFlatScanner, TestDictScanner) {
+ this->ExecuteAll(
+ num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, Encoding::RLE_DICTIONARY);
+}
+
+TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
+ this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
+ Encoding::PLAIN_DICTIONARY);
+}
+
+// PARQUET 502
+TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
+ NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ const ColumnDescriptor d(type, 0, 0);
+ num_values_ = MakePages<FLBAType>(
+ &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_levels_ = 1 * 100;
+ InitScanner(&d);
+ CheckResults(1, &d);
+}
+
+TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
+ NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ const ColumnDescriptor d(type, 4, 0);
+ num_values_ = MakePages<FLBAType>(
+ &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_levels_ = 1 * 100;
+ InitScanner(&d);
+ TypedScanner<FLBAType>* scanner =
+ reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
+ ASSERT_EQ(10, scanner->descr()->type_precision());
+ ASSERT_EQ(2, scanner->descr()->type_scale());
+ ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
+}
+
+TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
+ NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+ const ColumnDescriptor d(type, 4, 0);
+ num_values_ = MakePages<FLBAType>(
+ &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+ num_levels_ = 1 * 100;
+ InitScanner(&d);
+ TypedScanner<FLBAType>* scanner =
+ reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
+ scanner->SetBatchSize(batch_size);
+ std::stringstream ss_fail;
+ for (int i = 0; i < num_levels_; i++) {
+ std::stringstream ss;
+ scanner->PrintNext(ss, 17);
+ std::string result = ss.str();
+ ASSERT_LE(17, result.size()) << i;
+ }
+ ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException);
+}
+
+} // namespace test
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.cc b/src/parquet/column_scanner.cc
new file mode 100644
index 0000000..a67af71
--- /dev/null
+++ b/src/parquet/column_scanner.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_scanner.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/column_reader.h"
+#include "parquet/util/memory.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+
+std::shared_ptr<Scanner> Scanner::Make(
+ std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) {
+ switch (col_reader->type()) {
+ case Type::BOOLEAN:
+ return std::make_shared<BoolScanner>(col_reader, batch_size, pool);
+ case Type::INT32:
+ return std::make_shared<Int32Scanner>(col_reader, batch_size, pool);
+ case Type::INT64:
+ return std::make_shared<Int64Scanner>(col_reader, batch_size, pool);
+ case Type::INT96:
+ return std::make_shared<Int96Scanner>(col_reader, batch_size, pool);
+ case Type::FLOAT:
+ return std::make_shared<FloatScanner>(col_reader, batch_size, pool);
+ case Type::DOUBLE:
+ return std::make_shared<DoubleScanner>(col_reader, batch_size, pool);
+ case Type::BYTE_ARRAY:
+ return std::make_shared<ByteArrayScanner>(col_reader, batch_size, pool);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size, pool);
+ default:
+ ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return std::shared_ptr<Scanner>(nullptr);
+}
+
+int64_t ScanAllValues(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+ uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+ switch (reader->type()) {
+ case parquet::Type::BOOLEAN:
+ return ScanAll<parquet::BoolReader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::INT32:
+ return ScanAll<parquet::Int32Reader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::INT64:
+ return ScanAll<parquet::Int64Reader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::INT96:
+ return ScanAll<parquet::Int96Reader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::FLOAT:
+ return ScanAll<parquet::FloatReader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::DOUBLE:
+ return ScanAll<parquet::DoubleReader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::BYTE_ARRAY:
+ return ScanAll<parquet::ByteArrayReader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+ return ScanAll<parquet::FixedLenByteArrayReader>(
+ batch_size, def_levels, rep_levels, values, values_buffered, reader);
+ default:
+ parquet::ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return 0;
+}
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.h b/src/parquet/column_scanner.h
new file mode 100644
index 0000000..4be0b7f
--- /dev/null
+++ b/src/parquet/column_scanner.h
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_SCANNER_H
+#define PARQUET_COLUMN_SCANNER_H
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <stdio.h>
+#include <string>
+#include <vector>
+
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
+
+class PARQUET_EXPORT Scanner {
+ public:
+ explicit Scanner(std::shared_ptr<ColumnReader> reader,
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : batch_size_(batch_size),
+ level_offset_(0),
+ levels_buffered_(0),
+ value_buffer_(std::make_shared<PoolBuffer>(pool)),
+ value_offset_(0),
+ values_buffered_(0),
+ reader_(reader) {
+ def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
+ rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
+ }
+
+ virtual ~Scanner() {}
+
+ static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ virtual void PrintNext(std::ostream& out, int width) = 0;
+
+ bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
+
+ const ColumnDescriptor* descr() const { return reader_->descr(); }
+
+ int64_t batch_size() const { return batch_size_; }
+
+ void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
+
+ protected:
+ int64_t batch_size_;
+
+ std::vector<int16_t> def_levels_;
+ std::vector<int16_t> rep_levels_;
+ int level_offset_;
+ int levels_buffered_;
+
+ std::shared_ptr<PoolBuffer> value_buffer_;
+ int value_offset_;
+ int64_t values_buffered_;
+
+ private:
+ std::shared_ptr<ColumnReader> reader_;
+};
+
+template <typename DType>
+class PARQUET_EXPORT TypedScanner : public Scanner {
+ public:
+ typedef typename DType::c_type T;
+
+ explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : Scanner(reader, batch_size, pool) {
+ typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
+ int value_byte_size = type_traits<DType::type_num>::value_byte_size;
+ PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
+ values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
+ }
+
+ virtual ~TypedScanner() {}
+
+ bool NextLevels(int16_t* def_level, int16_t* rep_level) {
+ if (level_offset_ == levels_buffered_) {
+ levels_buffered_ =
+ static_cast<int>(typed_reader_->ReadBatch(static_cast<int>(batch_size_),
+ def_levels_.data(), rep_levels_.data(), values_, &values_buffered_));
+
+ value_offset_ = 0;
+ level_offset_ = 0;
+ if (!levels_buffered_) { return false; }
+ }
+ *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
+ *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
+ level_offset_++;
+ return true;
+ }
+
+ bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
+ if (level_offset_ == levels_buffered_) {
+ if (!HasNext()) {
+ // Out of data pages
+ return false;
+ }
+ }
+
+ NextLevels(def_level, rep_level);
+ *is_null = *def_level < descr()->max_definition_level();
+
+ if (*is_null) { return true; }
+
+ if (value_offset_ == values_buffered_) {
+ throw ParquetException("Value was non-null, but has not been buffered");
+ }
+ *val = values_[value_offset_++];
+ return true;
+ }
+
+ // Returns true if there is a next value
+ bool NextValue(T* val, bool* is_null) {
+ if (level_offset_ == levels_buffered_) {
+ if (!HasNext()) {
+ // Out of data pages
+ return false;
+ }
+ }
+
+ // Out of values
+ int16_t def_level = -1;
+ int16_t rep_level = -1;
+ NextLevels(&def_level, &rep_level);
+ *is_null = def_level < descr()->max_definition_level();
+
+ if (*is_null) { return true; }
+
+ if (value_offset_ == values_buffered_) {
+ throw ParquetException("Value was non-null, but has not been buffered");
+ }
+ *val = values_[value_offset_++];
+ return true;
+ }
+
+ virtual void PrintNext(std::ostream& out, int width) {
+ T val;
+ bool is_null = false;
+ char buffer[25];
+
+ if (!NextValue(&val, &is_null)) { throw ParquetException("No more values buffered"); }
+
+ if (is_null) {
+ std::string null_fmt = format_fwf<ByteArrayType>(width);
+ snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
+ } else {
+ FormatValue(&val, buffer, sizeof(buffer), width);
+ }
+ out << buffer;
+ }
+
+ private:
+ // The ownership of this object is expressed through the reader_ variable in the base
+ TypedColumnReader<DType>* typed_reader_;
+
+ inline void FormatValue(void* val, char* buffer, int bufsize, int width);
+
+ T* values_;
+};
+
+template <typename DType>
+inline void TypedScanner<DType>::FormatValue(
+ void* val, char* buffer, int bufsize, int width) {
+ std::string fmt = format_fwf<DType>(width);
+ snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
+}
+
+template <>
+inline void TypedScanner<Int96Type>::FormatValue(
+ void* val, char* buffer, int bufsize, int width) {
+ std::string fmt = format_fwf<Int96Type>(width);
+ std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
+ snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<ByteArrayType>::FormatValue(
+ void* val, char* buffer, int bufsize, int width) {
+ std::string fmt = format_fwf<ByteArrayType>(width);
+ std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
+ snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<FLBAType>::FormatValue(
+ void* val, char* buffer, int bufsize, int width) {
+ std::string fmt = format_fwf<FLBAType>(width);
+ std::string result = FixedLenByteArrayToString(
+ *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
+ snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+typedef TypedScanner<BooleanType> BoolScanner;
+typedef TypedScanner<Int32Type> Int32Scanner;
+typedef TypedScanner<Int64Type> Int64Scanner;
+typedef TypedScanner<Int96Type> Int96Scanner;
+typedef TypedScanner<FloatType> FloatScanner;
+typedef TypedScanner<DoubleType> DoubleScanner;
+typedef TypedScanner<ByteArrayType> ByteArrayScanner;
+typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
+
+template <typename RType>
+int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+ uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+ typedef typename RType::T Type;
+ auto typed_reader = static_cast<RType*>(reader);
+ auto vals = reinterpret_cast<Type*>(&values[0]);
+ return typed_reader->ReadBatch(
+ batch_size, def_levels, rep_levels, vals, values_buffered);
+}
+
+int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
+ int16_t* rep_levels, uint8_t* values, int64_t* values_buffered,
+ parquet::ColumnReader* reader);
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_SCANNER_H