You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/01/28 06:13:21 UTC
[2/2] parquet-cpp git commit: PARQUET-451: Add RowGroupReader helper
class and refactor parquet_reader.cc into DebugPrint
PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint
This also addresses PARQUET-433 and PARQUET-453.
Author: Wes McKinney <we...@cloudera.com>
Closes #23 from wesm/PARQUET-451 and squashes the following commits:
748ee0c [Wes McKinney] Turn MakeColumnReader into static ColumnReader::Make
6528497 [Wes McKinney] Incorporate code review comments
4b5575d [Wes McKinney] [PARQUET-451/453]: Implement RowGroupReader class and refactor parquet_reader.cc into ParquetFileReader::DebugPrint
2985e2e [Wes McKinney] [PARQUET-433]: Templatize decoders and column readers and remove most switch-on-type statements. Add parquet::SchemaElement* member to Decoder<T>, for FLBA metadata.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/1f24e765
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/1f24e765
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/1f24e765
Branch: refs/heads/master
Commit: 1f24e7658b9e9d41f95e6ce3a0d7a2fe3ace1abf
Parents: 8fc24f8
Author: Wes McKinney <we...@cloudera.com>
Authored: Wed Jan 27 21:13:15 2016 -0800
Committer: Nong Li <no...@gmail.com>
Committed: Wed Jan 27 21:13:15 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 3 +-
example/CMakeLists.txt | 8 +-
example/decode_benchmark.cc | 19 +-
example/example_util.cc | 84 ------
example/example_util.h | 44 ---
example/parquet_reader.cc | 256 ++---------------
setup_build_env.sh | 3 +-
src/parquet.cc | 272 ------------------
src/parquet/CMakeLists.txt | 2 +
src/parquet/column_reader.cc | 194 +++++++++++++
src/parquet/column_reader.h | 183 ++++++++++++
src/parquet/compression/codec.h | 6 +-
src/parquet/encodings/CMakeLists.txt | 1 -
src/parquet/encodings/bool-encoding.h | 48 ----
src/parquet/encodings/delta-bit-pack-encoding.h | 20 +-
.../encodings/delta-byte-array-encoding.h | 20 +-
.../delta-length-byte-array-encoding.h | 16 +-
src/parquet/encodings/dictionary-encoding.h | 131 ++++-----
src/parquet/encodings/encodings.h | 34 +--
src/parquet/encodings/plain-encoding.h | 82 +++---
src/parquet/parquet.h | 197 +------------
src/parquet/reader-test.cc | 14 +-
src/parquet/reader.cc | 283 ++++++++++++++++++-
src/parquet/reader.h | 65 ++++-
src/parquet/types.h | 112 ++++++++
src/parquet/util/CMakeLists.txt | 5 +
src/parquet/util/input_stream.cc | 63 +++++
src/parquet/util/input_stream.h | 80 ++++++
28 files changed, 1171 insertions(+), 1074 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a2f7e6a..b66e296 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -213,13 +213,14 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS})
# Library config
set(LIBPARQUET_SRCS
- src/parquet.cc
+ src/parquet/column_reader.cc
src/parquet/reader.cc
)
set(LIBPARQUET_LINK_LIBS
parquet_compression
parquet_thrift
+ parquet_util
thriftstatic
)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt
index a020184..05c541a 100644
--- a/example/CMakeLists.txt
+++ b/example/CMakeLists.txt
@@ -12,16 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
-add_library(Example STATIC
- example_util.cc
-)
-
SET(LINK_LIBS
parquet
snappystatic
- thriftstatic
- Example)
+ thriftstatic)
add_executable(decode_benchmark decode_benchmark.cc)
target_link_libraries(decode_benchmark ${LINK_LIBS})
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/decode_benchmark.cc
----------------------------------------------------------------------
diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc
index ed4077a..f33232d 100644
--- a/example/decode_benchmark.cc
+++ b/example/decode_benchmark.cc
@@ -16,7 +16,6 @@
#include <iostream>
#include <stdio.h>
-#include "example_util.h"
#include "parquet/compression/codec.h"
#include "parquet/encodings/encodings.h"
#include "parquet/util/stopwatch.h"
@@ -198,11 +197,11 @@ class DeltaByteArrayEncoder {
uint64_t TestPlainIntEncoding(const uint8_t* data, int num_values, int batch_size) {
uint64_t result = 0;
- PlainDecoder decoder(Type::INT64);
+ PlainDecoder<Type::INT64> decoder(nullptr);
decoder.SetData(num_values, data, num_values * sizeof(int64_t));
int64_t values[batch_size];
for (int i = 0; i < num_values;) {
- int n = decoder.GetInt64(values, batch_size);
+ int n = decoder.Decode(values, batch_size);
for (int j = 0; j < n; ++j) {
result += values[j];
}
@@ -221,7 +220,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
} else {
mini_block_size = 32;
}
- DeltaBitPackDecoder decoder(Type::INT64);
+ DeltaBitPackDecoder<Type::INT64> decoder(nullptr);
DeltaBitPackEncoder encoder(mini_block_size);
for (int i = 0; i < values.size(); ++i) {
encoder.Add(values[i]);
@@ -238,7 +237,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
decoder.SetData(encoder.num_values(), buffer, len);
for (int i = 0; i < encoder.num_values(); ++i) {
int64_t x = 0;
- decoder.GetInt64(&x, 1);
+ decoder.Decode(&x, 1);
if (values[i] != x) {
cerr << "Bad: " << i << endl;
cerr << " " << x << " != " << values[i] << endl;
@@ -258,7 +257,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
for (int k = 0; k < benchmark_iters; ++k) {
decoder.SetData(encoder.num_values(), buffer, len);
for (int i = 0; i < values.size();) {
- int n = decoder.GetInt64(buf, benchmark_batch_size);
+ int n = decoder.Decode(buf, benchmark_batch_size);
for (int j = 0; j < n; ++j) {
result += buf[j];
}
@@ -349,7 +348,7 @@ void TestBinaryPacking() {
}
void TestDeltaLengthByteArray() {
- DeltaLengthByteArrayDecoder decoder;
+ DeltaLengthByteArrayDecoder decoder(nullptr);
DeltaLengthByteArrayEncoder encoder;
vector<string> values;
@@ -369,7 +368,7 @@ void TestDeltaLengthByteArray() {
decoder.SetData(encoder.num_values(), buffer, len);
for (int i = 0; i < encoder.num_values(); ++i) {
ByteArray v;
- decoder.GetByteArray(&v, 1);
+ decoder.Decode(&v, 1);
string r = string((char*)v.ptr, v.len);
if (r != values[i]) {
cout << "Bad " << r << " != " << values[i] << endl;
@@ -378,7 +377,7 @@ void TestDeltaLengthByteArray() {
}
void TestDeltaByteArray() {
- DeltaByteArrayDecoder decoder;
+ DeltaByteArrayDecoder decoder(nullptr);
DeltaByteArrayEncoder encoder;
vector<string> values;
@@ -407,7 +406,7 @@ void TestDeltaByteArray() {
decoder.SetData(encoder.num_values(), buffer, len);
for (int i = 0; i < encoder.num_values(); ++i) {
ByteArray v;
- decoder.GetByteArray(&v, 1);
+ decoder.Decode(&v, 1);
string r = string((char*)v.ptr, v.len);
if (r != values[i]) {
cout << "Bad " << r << " != " << values[i] << endl;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.cc
----------------------------------------------------------------------
diff --git a/example/example_util.cc b/example/example_util.cc
deleted file mode 100644
index 07d8129..0000000
--- a/example/example_util.cc
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "example_util.h"
-#include <iostream>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "parquet/thrift/util.h"
-
-using namespace parquet;
-using namespace parquet_cpp;
-using namespace std;
-
-// 4 byte constant + 4 byte metadata len
-const uint32_t FOOTER_SIZE = 8;
-const uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-struct ScopedFile {
- public:
- ScopedFile(FILE* f) : file_(f) { }
- ~ScopedFile() { fclose(file_); }
-
- private:
- FILE* file_;
-};
-
-bool GetFileMetadata(const string& path, FileMetaData* metadata) {
- FILE* file = fopen(path.c_str(), "r");
- if (!file) {
- cerr << "Could not open file: " << path << endl;
- return false;
- }
- ScopedFile cleanup(file);
- fseek(file, 0L, SEEK_END);
- size_t file_len = ftell(file);
- if (file_len < FOOTER_SIZE) {
- cerr << "Invalid parquet file. Corrupt footer." << endl;
- return false;
- }
-
- uint8_t footer_buffer[FOOTER_SIZE];
- fseek(file, file_len - FOOTER_SIZE, SEEK_SET);
- size_t bytes_read = fread(footer_buffer, 1, FOOTER_SIZE, file);
- if (bytes_read != FOOTER_SIZE) {
- cerr << "Invalid parquet file. Corrupt footer." << endl;
- return false;
- }
- if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
- cerr << "Invalid parquet file. Corrupt footer." << endl;
- return false;
- }
-
- uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
- size_t metadata_start = file_len - FOOTER_SIZE - metadata_len;
- if (metadata_start < 0) {
- cerr << "Invalid parquet file. File is less than file metadata size." << endl;
- return false;
- }
-
- fseek(file, metadata_start, SEEK_SET);
- uint8_t metadata_buffer[metadata_len];
- bytes_read = fread(metadata_buffer, 1, metadata_len, file);
- if (bytes_read != metadata_len) {
- cerr << "Invalid parquet file. Could not read metadata bytes." << endl;
- return false;
- }
-
- DeserializeThriftMsg(metadata_buffer, &metadata_len, metadata);
- return true;
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.h
----------------------------------------------------------------------
diff --git a/example/example_util.h b/example/example_util.h
deleted file mode 100644
index a8b58fc..0000000
--- a/example/example_util.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef PARQUET_EXAMPLE_UTIL_H
-#define PARQUET_EXAMPLE_UTIL_H
-
-#include <string>
-#include <parquet/parquet.h>
-#include <stdio.h>
-
-bool GetFileMetadata(const std::string& path, parquet::FileMetaData* metadata);
-
-class InputFile {
-private:
- FILE* file;
- std::string filename;
-
-public:
- InputFile(const std::string& _filename): filename(_filename) {
- file = fopen(_filename.c_str(), "r");
- }
- ~InputFile() {
- if (file != NULL) {
- fclose(file);
- }
- }
-
- FILE* getFileHandle() { return file; }
- bool isOpen() { return file != NULL; }
- std::string getFilename() { return filename; }
-};
-
-#endif // PARQUET_EXAMPLE_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 5379c5e..7b476b5 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -13,255 +13,49 @@
// limitations under the License.
#include <parquet/parquet.h>
-#include "example_util.h"
#include <iostream>
-// the fixed initial size is just for an example
-#define COL_WIDTH "17"
-
-using namespace parquet;
-using namespace parquet_cpp;
-using namespace std;
-
-struct AnyType {
- union {
- bool bool_val;
- int32_t int32_val;
- int64_t int64_val;
- float float_val;
- double double_val;
- ByteArray byte_array_val;
- };
-};
-
-static string ByteArrayToString(const ByteArray& a) {
- return string(reinterpret_cast<const char*>(a.ptr), a.len);
-}
-
-int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
- int len = ::min(x1.len, x2.len);
- int cmp = memcmp(x1.ptr, x2.ptr, len);
- if (cmp != 0) return cmp;
- if (len < x1.len) return 1;
- if (len < x2.len) return -1;
- return 0;
-}
-
-string type2String(Type::type t) {
- switch(t) {
- case Type::BOOLEAN:
- return "BOOLEAN";
- break;
- case Type::INT32:
- return "INT32";
- break;
- case Type::INT64:
- return "INT64";
- break;
- case Type::FLOAT:
- return "FLOAT";
- break;
- case Type::DOUBLE:
- return "DOUBLE";
- break;
- case Type::BYTE_ARRAY:
- return "BYTE_ARRAY";
- break;
- case Type::INT96:
- return "INT96";
- break;
- case Type::FIXED_LEN_BYTE_ARRAY:
- return "FIXED_LEN_BYTE_ARRAY";
- break;
- default:
- return "UNKNOWN";
- break;
- }
-}
-
-void readParquet(const string& filename, const bool printValues) {
- InputFile file(filename);
- if (!file.isOpen()) {
- cerr << "Could not open file " << file.getFilename() << endl;
- return;
- }
-
- FileMetaData metadata;
- if (!GetFileMetadata(file.getFilename().c_str(), &metadata)) {
- cerr << "Could not read metadata from file " << file.getFilename() << endl;
- return;
- }
-
- cout << "File statistics:\n" ;
- cout << "Total rows: " << metadata.num_rows << "\n";
- for (int c = 1; c < metadata.schema.size(); ++c) {
- cout << "Column " << c-1 << ": " << metadata.schema[c].name << " ("
- << type2String(metadata.schema[c].type);
- if (metadata.schema[c].type == Type::INT96 ||
- metadata.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
- cout << " - not supported";
- }
- cout << ")\n";
- }
-
- for (int i = 0; i < metadata.row_groups.size(); ++i) {
- cout << "--- Row Group " << i << " ---\n";
-
- // Print column metadata
- const RowGroup& row_group = metadata.row_groups[i];
- size_t nColumns = row_group.columns.size();
-
- for (int c = 0; c < nColumns; ++c) {
- const ColumnMetaData& meta_data = row_group.columns[c].meta_data;
- cout << "Column " << c
- << ": " << meta_data.num_values << " rows, "
- << meta_data.statistics.null_count << " null values, "
- << meta_data.statistics.distinct_count << " distinct values, "
- << "min value: " << (meta_data.statistics.min.length()>0 ?
- meta_data.statistics.min : "N/A")
- << ", max value: " << (meta_data.statistics.max.length()>0 ?
- meta_data.statistics.max : "N/A") << ".\n";
- }
-
- if (!printValues) {
- continue;
- }
-
- // Create readers for all columns and print contents
- vector<ColumnReader*> readers(nColumns, NULL);
- try {
- for (int c = 0; c < nColumns; ++c) {
- const ColumnChunk& col = row_group.columns[c];
- printf("%-" COL_WIDTH"s", metadata.schema[c+1].name.c_str());
-
- if (col.meta_data.type == Type::INT96 ||
- col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) {
- continue;
- }
-
- size_t col_start = col.meta_data.data_page_offset;
- if (col.meta_data.__isset.dictionary_page_offset &&
- col_start > col.meta_data.dictionary_page_offset) {
- col_start = col.meta_data.dictionary_page_offset;
- }
-
- std::unique_ptr<ScopedInMemoryInputStream> input(
- new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
- fseek(file.getFileHandle(), col_start, SEEK_SET);
- size_t num_read = fread(input->data(),
- 1,
- input->size(),
- file.getFileHandle());
- if (num_read != input->size()) {
- cerr << "Could not read column data." << endl;
- continue;
- }
-
- readers[c] = new ColumnReader(&col.meta_data,
- &metadata.schema[c+1],
- input.release());
- }
- cout << "\n";
-
- vector<int> def_level(nColumns, 0);
- vector<int> rep_level(nColumns, 0);
-
- bool hasRow;
- do {
- hasRow = false;
- for (int c = 0; c < nColumns; ++c) {
- if (readers[c] == NULL) {
- printf("%-" COL_WIDTH"s", " ");
- continue;
- }
- const ColumnChunk& col = row_group.columns[c];
- if (readers[c]->HasNext()) {
- hasRow = true;
- switch (col.meta_data.type) {
- case Type::BOOLEAN: {
- bool val = readers[c]->GetBool(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- printf("%-" COL_WIDTH"d",val);
- }
- break;
- }
- case Type::INT32: {
- int32_t val = readers[c]->GetInt32(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- printf("%-" COL_WIDTH"d",val);
- }
- break;
- }
- case Type::INT64: {
- int64_t val = readers[c]->GetInt64(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- printf("%-" COL_WIDTH"ld",val);
- }
- break;
- }
- case Type::FLOAT: {
- float val = readers[c]->GetFloat(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- printf("%-" COL_WIDTH"f",val);
- }
- break;
- }
- case Type::DOUBLE: {
- double val = readers[c]->GetDouble(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- printf("%-" COL_WIDTH"lf",val);
- }
- break;
- }
- case Type::BYTE_ARRAY: {
- ByteArray val = readers[c]->GetByteArray(&def_level[c], &rep_level[c]);
- if (def_level[c] >= rep_level[c]) {
- string result = ByteArrayToString(val);
- printf("%-" COL_WIDTH"s", result.c_str());
- }
- break;
- }
- default:
- continue;
- }
- }
- }
- cout << "\n";
- } while (hasRow);
- } catch (exception& e) {
- cout << "Caught an exception: " << e.what() << "\n";
- } catch (...) {
- cout << "Caught an exception.\n";
- }
-
- for(vector<ColumnReader*>::iterator it = readers.begin(); it != readers.end(); it++) {
- delete *it;
- }
- }
-}
-
int main(int argc, char** argv) {
if (argc > 3) {
- cerr << "Usage: parquet_reader [--only-stats] <file>" << endl;
+ std::cerr << "Usage: parquet_reader [--only-stats] <file>"
+ << std::endl;
return -1;
}
- string filename;
- bool printContents = true;
+ std::string filename;
+ bool print_values = true;
// Read command-line options
char *param, *value;
for (int i = 1; i < argc; i++) {
if ( (param = std::strstr(argv[i], "--only-stats")) ) {
- printContents = false;
+ print_values = false;
} else {
filename = argv[i];
}
}
- readParquet(filename, printContents);
+ parquet_cpp::ParquetFileReader reader;
+ parquet_cpp::LocalFile file;
+
+ file.Open(filename);
+ if (!file.is_open()) {
+ std::cerr << "Could not open file " << file.path()
+ << std::endl;
+ return -1;
+ }
+
+ try {
+ reader.Open(&file);
+ reader.ParseMetaData();
+ reader.DebugPrint(std::cout, print_values);
+ } catch (const std::exception& e) {
+ std::cerr << "Parquet error: "
+ << e.what()
+ << std::endl;
+ return -1;
+ }
return 0;
}
-
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/setup_build_env.sh
----------------------------------------------------------------------
diff --git a/setup_build_env.sh b/setup_build_env.sh
index 1cd7bb2..c95b889 100755
--- a/setup_build_env.sh
+++ b/setup_build_env.sh
@@ -17,9 +17,8 @@ if [ "$(uname)" != "Darwin" ]; then
export THRIFT_HOME=$BUILD_DIR/thirdparty/installed
fi
-export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR
-
export PARQUET_TEST_DATA=$SOURCE_DIR/data
+export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR
cmake $SOURCE_DIR
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet.cc
----------------------------------------------------------------------
diff --git a/src/parquet.cc b/src/parquet.cc
deleted file mode 100644
index 6b6adaa..0000000
--- a/src/parquet.cc
+++ /dev/null
@@ -1,272 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "parquet/parquet.h"
-
-#include <algorithm>
-#include <string>
-#include <string.h>
-
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include "parquet/encodings/encodings.h"
-#include "parquet/compression/codec.h"
-#include "parquet/thrift/util.h"
-
-const int DATA_PAGE_SIZE = 64 * 1024;
-
-namespace parquet_cpp {
-
-using parquet::CompressionCodec;
-using parquet::Encoding;
-using parquet::FieldRepetitionType;
-using parquet::PageType;
-using parquet::SchemaElement;
-using parquet::Type;
-
-InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
- buffer_(buffer), len_(len), offset_(0) {}
-
-const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
- *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
- return buffer_ + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
- const uint8_t* result = Peek(num_to_read, num_bytes);
- offset_ += *num_bytes;
- return result;
-}
-
-ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
- buffer_.resize(len);
- stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
-}
-
-uint8_t* ScopedInMemoryInputStream::data() {
- return buffer_.data();
-}
-
-int64_t ScopedInMemoryInputStream::size() {
- return buffer_.size();
-}
-
-const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
- int* num_bytes) {
- return stream_->Peek(num_to_peek, num_bytes);
-}
-
-const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
- int* num_bytes) {
- return stream_->Read(num_to_read, num_bytes);
-}
-
-
-ColumnReader::~ColumnReader() {
- delete stream_;
-}
-
-ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
- const SchemaElement* schema, InputStream* stream)
- : metadata_(metadata),
- schema_(schema),
- stream_(stream),
- current_decoder_(NULL),
- num_buffered_values_(0),
- num_decoded_values_(0),
- buffered_values_offset_(0) {
- int value_byte_size;
- switch (metadata->type) {
- case parquet::Type::BOOLEAN:
- value_byte_size = 1;
- break;
- case parquet::Type::INT32:
- value_byte_size = sizeof(int32_t);
- break;
- case parquet::Type::INT64:
- value_byte_size = sizeof(int64_t);
- break;
- case parquet::Type::FLOAT:
- value_byte_size = sizeof(float);
- break;
- case parquet::Type::DOUBLE:
- value_byte_size = sizeof(double);
- break;
- case parquet::Type::BYTE_ARRAY:
- value_byte_size = sizeof(ByteArray);
- break;
- default:
- ParquetException::NYI("Unsupported type");
- }
-
- switch (metadata->codec) {
- case CompressionCodec::UNCOMPRESSED:
- break;
- case CompressionCodec::SNAPPY:
- decompressor_.reset(new SnappyCodec());
- break;
- default:
- ParquetException::NYI("Reading compressed data");
- }
-
- config_ = Config::DefaultConfig();
- values_buffer_.resize(config_.batch_size * value_byte_size);
-}
-
-void ColumnReader::BatchDecode() {
- buffered_values_offset_ = 0;
- uint8_t* buf = &values_buffer_[0];
- int batch_size = config_.batch_size;
- switch (metadata_->type) {
- case parquet::Type::BOOLEAN:
- num_decoded_values_ =
- current_decoder_->GetBool(reinterpret_cast<bool*>(buf), batch_size);
- break;
- case parquet::Type::INT32:
- num_decoded_values_ =
- current_decoder_->GetInt32(reinterpret_cast<int32_t*>(buf), batch_size);
- break;
- case parquet::Type::INT64:
- num_decoded_values_ =
- current_decoder_->GetInt64(reinterpret_cast<int64_t*>(buf), batch_size);
- break;
- case parquet::Type::FLOAT:
- num_decoded_values_ =
- current_decoder_->GetFloat(reinterpret_cast<float*>(buf), batch_size);
- break;
- case parquet::Type::DOUBLE:
- num_decoded_values_ =
- current_decoder_->GetDouble(reinterpret_cast<double*>(buf), batch_size);
- break;
- case parquet::Type::BYTE_ARRAY:
- num_decoded_values_ =
- current_decoder_->GetByteArray(reinterpret_cast<ByteArray*>(buf), batch_size);
- break;
- default:
- ParquetException::NYI("Unsupported type.");
- }
-}
-
-// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
-// encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
- return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
-}
-
-bool ColumnReader::ReadNewPage() {
- // Loop until we find the next data page.
-
- while (true) {
- int bytes_read = 0;
- const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
- if (bytes_read == 0) return false;
- uint32_t header_size = bytes_read;
- DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
- stream_->Read(header_size, &bytes_read);
-
- int compressed_len = current_page_header_.compressed_page_size;
- int uncompressed_len = current_page_header_.uncompressed_page_size;
-
- // Read the compressed data page.
- buffer = stream_->Read(compressed_len, &bytes_read);
- if (bytes_read != compressed_len) ParquetException::EofException();
-
- // Uncompress it if we need to
- if (decompressor_ != NULL) {
- // Grow the uncompressed buffer if we need to.
- if (uncompressed_len > decompression_buffer_.size()) {
- decompression_buffer_.resize(uncompressed_len);
- }
- decompressor_->Decompress(
- compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]);
- buffer = &decompression_buffer_[0];
- }
-
- if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
- std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
- decoders_.find(Encoding::RLE_DICTIONARY);
- if (it != decoders_.end()) {
- throw ParquetException("Column cannot have more than one dictionary.");
- }
-
- PlainDecoder dictionary(schema_->type);
- dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
- buffer, uncompressed_len);
- std::shared_ptr<Decoder> decoder(
- new DictionaryDecoder(schema_->type, &dictionary));
- decoders_[Encoding::RLE_DICTIONARY] = decoder;
- current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
- continue;
- } else if (current_page_header_.type == PageType::DATA_PAGE) {
- // Read a data page.
- num_buffered_values_ = current_page_header_.data_page_header.num_values;
-
- // Read definition levels.
- if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
- int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
- buffer += sizeof(uint32_t);
- definition_level_decoder_.reset(
- new RleDecoder(buffer, num_definition_bytes, 1));
- buffer += num_definition_bytes;
- uncompressed_len -= sizeof(uint32_t);
- uncompressed_len -= num_definition_bytes;
- }
-
- // TODO: repetition levels
-
- // Get a decoder object for this page or create a new decoder if this is the
- // first page with this encoding.
- Encoding::type encoding = current_page_header_.data_page_header.encoding;
- if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
-
- std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
- decoders_.find(encoding);
- if (it != decoders_.end()) {
- current_decoder_ = it->second.get();
- } else {
- switch (encoding) {
- case Encoding::PLAIN: {
- std::shared_ptr<Decoder> decoder;
- if (schema_->type == Type::BOOLEAN) {
- decoder.reset(new BoolDecoder());
- } else {
- decoder.reset(new PlainDecoder(schema_->type));
- }
- decoders_[encoding] = decoder;
- current_decoder_ = decoder.get();
- break;
- }
- case Encoding::RLE_DICTIONARY:
- throw ParquetException("Dictionary page must be before data page.");
-
- case Encoding::DELTA_BINARY_PACKED:
- case Encoding::DELTA_LENGTH_BYTE_ARRAY:
- case Encoding::DELTA_BYTE_ARRAY:
- ParquetException::NYI("Unsupported encoding");
-
- default:
- throw ParquetException("Unknown encoding type.");
- }
- }
- current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
- return true;
- } else {
- // We don't know what this page type is. We're allowed to skip non-data pages.
- continue;
- }
- }
- return true;
-}
-
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index f08901e..1809ea1 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -18,8 +18,10 @@
# Headers: top level
install(FILES
parquet.h
+ column_reader.h
reader.h
exception.h
+ types.h
DESTINATION include/parquet)
ADD_PARQUET_TEST(reader-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
new file mode 100644
index 0000000..b7ececb
--- /dev/null
+++ b/src/parquet/column_reader.cc
@@ -0,0 +1,194 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet/column_reader.h"
+
+#include <algorithm>
+#include <string>
+#include <string.h>
+
+#include "parquet/encodings/encodings.h"
+#include "parquet/compression/codec.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+const int DATA_PAGE_SIZE = 64 * 1024;
+
+namespace parquet_cpp {
+
+using parquet::CompressionCodec;
+using parquet::Encoding;
+using parquet::FieldRepetitionType;
+using parquet::PageType;
+using parquet::Type;
+
+
+ColumnReader::~ColumnReader() {
+ delete stream_;
+}
+
+ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
+ const parquet::SchemaElement* schema, InputStream* stream)
+ : metadata_(metadata),
+ schema_(schema),
+ stream_(stream),
+ num_buffered_values_(0),
+ num_decoded_values_(0),
+ buffered_values_offset_(0) {
+
+ switch (metadata->codec) {
+ case CompressionCodec::UNCOMPRESSED:
+ break;
+ case CompressionCodec::SNAPPY:
+ decompressor_.reset(new SnappyCodec());
+ break;
+ default:
+ ParquetException::NYI("Reading compressed data");
+ }
+
+ config_ = Config::DefaultConfig();
+}
+
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+ return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+template <int TYPE>
+bool TypedColumnReader<TYPE>::ReadNewPage() {
+ // Loop until we find the next data page.
+
+
+ while (true) {
+ int bytes_read = 0;
+ const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
+ if (bytes_read == 0) return false;
+ uint32_t header_size = bytes_read;
+ DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
+ stream_->Read(header_size, &bytes_read);
+
+ int compressed_len = current_page_header_.compressed_page_size;
+ int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+ // Read the compressed data page.
+ buffer = stream_->Read(compressed_len, &bytes_read);
+ if (bytes_read != compressed_len) ParquetException::EofException();
+
+ // Uncompress it if we need to
+ if (decompressor_ != NULL) {
+ // Grow the uncompressed buffer if we need to.
+ if (uncompressed_len > decompression_buffer_.size()) {
+ decompression_buffer_.resize(uncompressed_len);
+ }
+ decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+ &decompression_buffer_[0]);
+ buffer = &decompression_buffer_[0];
+ }
+
+ if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
+ auto it = decoders_.find(Encoding::RLE_DICTIONARY);
+ if (it != decoders_.end()) {
+ throw ParquetException("Column cannot have more than one dictionary.");
+ }
+
+ PlainDecoder<TYPE> dictionary(schema_);
+ dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
+ buffer, uncompressed_len);
+ std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary));
+
+ decoders_[Encoding::RLE_DICTIONARY] = decoder;
+ current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
+ continue;
+ } else if (current_page_header_.type == PageType::DATA_PAGE) {
+ // Read a data page.
+ num_buffered_values_ = current_page_header_.data_page_header.num_values;
+
+ // Read definition levels.
+ if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
+ int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
+ buffer += sizeof(uint32_t);
+ definition_level_decoder_.reset(
+ new RleDecoder(buffer, num_definition_bytes, 1));
+ buffer += num_definition_bytes;
+ uncompressed_len -= sizeof(uint32_t);
+ uncompressed_len -= num_definition_bytes;
+ }
+
+ // TODO: repetition levels
+
+ // Get a decoder object for this page or create a new decoder if this is the
+ // first page with this encoding.
+ Encoding::type encoding = current_page_header_.data_page_header.encoding;
+ if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
+
+ auto it = decoders_.find(encoding);
+ if (it != decoders_.end()) {
+ current_decoder_ = it->second.get();
+ } else {
+ switch (encoding) {
+ case Encoding::PLAIN: {
+ std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
+ decoders_[encoding] = decoder;
+ current_decoder_ = decoder.get();
+ break;
+ }
+ case Encoding::RLE_DICTIONARY:
+ throw ParquetException("Dictionary page must be before data page.");
+
+ case Encoding::DELTA_BINARY_PACKED:
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+ case Encoding::DELTA_BYTE_ARRAY:
+ ParquetException::NYI("Unsupported encoding");
+
+ default:
+ throw ParquetException("Unknown encoding type.");
+ }
+ }
+ current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
+ return true;
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data pages.
+ continue;
+ }
+ }
+ return true;
+}
+
+std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
+ const parquet::SchemaElement* element, InputStream* stream) {
+ switch (metadata->type) {
+ case Type::BOOLEAN:
+ return std::make_shared<BoolReader>(metadata, element, stream);
+ case Type::INT32:
+ return std::make_shared<Int32Reader>(metadata, element, stream);
+ case Type::INT64:
+ return std::make_shared<Int64Reader>(metadata, element, stream);
+ case Type::INT96:
+ return std::make_shared<Int96Reader>(metadata, element, stream);
+ case Type::FLOAT:
+ return std::make_shared<FloatReader>(metadata, element, stream);
+ case Type::DOUBLE:
+ return std::make_shared<DoubleReader>(metadata, element, stream);
+ case Type::BYTE_ARRAY:
+ return std::make_shared<ByteArrayReader>(metadata, element, stream);
+ default:
+ ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return std::shared_ptr<ColumnReader>(nullptr);
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
new file mode 100644
index 0000000..cd6cc02
--- /dev/null
+++ b/src/parquet/column_reader.h
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_READER_H
+#define PARQUET_COLUMN_READER_H
+
+#include <exception>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/input_stream.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace std {
+
+template <>
+struct hash<parquet::Encoding::type> {
+ std::size_t operator()(const parquet::Encoding::type& k) const {
+ return hash<int>()(static_cast<int>(k));
+ }
+};
+
+} // namespace std
+
+namespace parquet_cpp {
+
+class Codec;
+
+class ColumnReader {
+ public:
+
+ struct Config {
+ int batch_size;
+
+ static Config DefaultConfig() {
+ Config config;
+ config.batch_size = 128;
+ return config;
+ }
+ };
+
+ ColumnReader(const parquet::ColumnMetaData*,
+ const parquet::SchemaElement*, InputStream* stream);
+
+ virtual ~ColumnReader();
+
+ static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
+ const parquet::SchemaElement*, InputStream* stream);
+
+ virtual bool ReadNewPage() = 0;
+
+ // Returns true if there are still values in this column.
+ bool HasNext() {
+ if (num_buffered_values_ == 0) {
+ ReadNewPage();
+ if (num_buffered_values_ == 0) return false;
+ }
+ return true;
+ }
+
+ parquet::Type::type type() const {
+ return metadata_->type;
+ }
+
+ const parquet::ColumnMetaData* metadata() const {
+ return metadata_;
+ }
+
+ protected:
+ // Reads the next definition and repetition level. Returns true if the value is NULL.
+ bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
+
+ Config config_;
+
+ const parquet::ColumnMetaData* metadata_;
+ const parquet::SchemaElement* schema_;
+ InputStream* stream_;
+
+ // Compression codec to use.
+ std::unique_ptr<Codec> decompressor_;
+ std::vector<uint8_t> decompression_buffer_;
+
+ parquet::PageHeader current_page_header_;
+
+ // Not set if field is required.
+ std::unique_ptr<RleDecoder> definition_level_decoder_;
+ // Not set for flat schemas.
+ std::unique_ptr<RleDecoder> repetition_level_decoder_;
+ int num_buffered_values_;
+
+ int num_decoded_values_;
+ int buffered_values_offset_;
+};
+
+
+// API to read values from a single column. This is the main client facing API.
+template <int TYPE>
+class TypedColumnReader : public ColumnReader {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ TypedColumnReader(const parquet::ColumnMetaData* metadata,
+ const parquet::SchemaElement* schema, InputStream* stream) :
+ ColumnReader(metadata, schema, stream),
+ current_decoder_(NULL) {
+ size_t value_byte_size = type_traits<TYPE>::value_byte_size;
+ values_buffer_.resize(config_.batch_size * value_byte_size);
+ }
+
+ // Returns the next value of this type.
+ // TODO: batchify this interface.
+ T NextValue(int* def_level, int* rep_level) {
+ if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T();
+ if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
+ return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++];
+ }
+
+ private:
+ void BatchDecode();
+
+ virtual bool ReadNewPage();
+
+ typedef Decoder<TYPE> DecoderType;
+
+ // Map of compression type to decompressor object.
+ std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
+
+ DecoderType* current_decoder_;
+ std::vector<uint8_t> values_buffer_;
+};
+
+typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
+typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
+typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
+typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
+typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
+typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
+typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
+
+
+template <int TYPE>
+void TypedColumnReader<TYPE>::BatchDecode() {
+ buffered_values_offset_ = 0;
+ T* buf = reinterpret_cast<T*>(&values_buffer_[0]);
+ int batch_size = config_.batch_size;
+ num_decoded_values_ = current_decoder_->Decode(buf, batch_size);
+}
+
+inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
+ *rep_level = 1;
+ if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
+ ParquetException::EofException();
+ }
+ --num_buffered_values_;
+ return *def_level == 0;
+}
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 8166847..07648d7 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -15,11 +15,9 @@
#ifndef PARQUET_COMPRESSION_CODEC_H
#define PARQUET_COMPRESSION_CODEC_H
-#include "parquet/parquet.h"
-
#include <cstdint>
-#include "parquet/thrift/parquet_constants.h"
-#include "parquet/thrift/parquet_types.h"
+
+#include "parquet/exception.h"
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
index 72baf48..544b1e1 100644
--- a/src/parquet/encodings/CMakeLists.txt
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -15,7 +15,6 @@
# Headers: encodings
install(FILES
encodings.h
- bool-encoding.h
delta-bit-pack-encoding.h
delta-byte-array-encoding.h
delta-length-byte-array-encoding.h
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/bool-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/bool-encoding.h b/src/parquet/encodings/bool-encoding.h
deleted file mode 100644
index 8eb55bc..0000000
--- a/src/parquet/encodings/bool-encoding.h
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef PARQUET_BOOL_ENCODING_H
-#define PARQUET_BOOL_ENCODING_H
-
-#include "parquet/encodings/encodings.h"
-
-#include <algorithm>
-
-namespace parquet_cpp {
-
-class BoolDecoder : public Decoder {
- public:
- BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { }
-
- virtual void SetData(int num_values, const uint8_t* data, int len) {
- num_values_ = num_values;
- decoder_ = RleDecoder(data, len, 1);
- }
-
- virtual int GetBool(bool* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- if (!decoder_.Get(&buffer[i])) ParquetException::EofException();
- }
- num_values_ -= max_values;
- return max_values;
- }
-
- private:
- RleDecoder decoder_;
-};
-
-} // namespace parquet_cpp
-
-#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 77a3b26..b437734 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -22,10 +22,16 @@
namespace parquet_cpp {
-class DeltaBitPackDecoder : public Decoder {
+template <int TYPE>
+class DeltaBitPackDecoder : public Decoder<TYPE> {
public:
- explicit DeltaBitPackDecoder(const parquet::Type::type& type)
- : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) {
+ typedef typename type_traits<TYPE>::value_type T;
+
+ explicit DeltaBitPackDecoder(const parquet::SchemaElement* schema)
+ : Decoder<TYPE>(schema, parquet::Encoding::DELTA_BINARY_PACKED) {
+
+ parquet::Type::type type = type_traits<TYPE>::parquet_type;
+
if (type != parquet::Type::INT32 && type != parquet::Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
@@ -38,15 +44,13 @@ class DeltaBitPackDecoder : public Decoder {
values_current_mini_block_ = 0;
}
- virtual int GetInt32(int32_t* buffer, int max_values) {
- return GetInternal(buffer, max_values);
- }
-
- virtual int GetInt64(int64_t* buffer, int max_values) {
+ virtual int Decode(T* buffer, int max_values) {
return GetInternal(buffer, max_values);
}
private:
+ using Decoder<TYPE>::num_values_;
+
void InitBlock() {
uint64_t block_size;
if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
index 3396586..a1b5b48 100644
--- a/src/parquet/encodings/delta-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -21,12 +21,12 @@
namespace parquet_cpp {
-class DeltaByteArrayDecoder : public Decoder {
+class DeltaByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> {
public:
- DeltaByteArrayDecoder()
- : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY),
- prefix_len_decoder_(parquet::Type::INT32),
- suffix_decoder_() {
+ explicit DeltaByteArrayDecoder(const parquet::SchemaElement* schema)
+ : Decoder<parquet::Type::BYTE_ARRAY>(schema, parquet::Encoding::DELTA_BYTE_ARRAY),
+ prefix_len_decoder_(nullptr),
+ suffix_decoder_(nullptr) {
}
virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -43,13 +43,13 @@ class DeltaByteArrayDecoder : public Decoder {
// TODO: this doesn't work and requires memory management. We need to allocate
// new strings to store the results.
- virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ virtual int Decode(ByteArray* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
for (int i = 0; i < max_values; ++i) {
int prefix_len = 0;
- prefix_len_decoder_.GetInt32(&prefix_len, 1);
+ prefix_len_decoder_.Decode(&prefix_len, 1);
ByteArray suffix;
- suffix_decoder_.GetByteArray(&suffix, 1);
+ suffix_decoder_.Decode(&suffix, 1);
buffer[i].len = prefix_len + suffix.len;
uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
@@ -64,7 +64,9 @@ class DeltaByteArrayDecoder : public Decoder {
}
private:
- DeltaBitPackDecoder prefix_len_decoder_;
+ using Decoder<parquet::Type::BYTE_ARRAY>::num_values_;
+
+ DeltaBitPackDecoder<parquet::Type::INT32> prefix_len_decoder_;
DeltaLengthByteArrayDecoder suffix_decoder_;
ByteArray last_value_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
index 06bf39d..a6e4c58 100644
--- a/src/parquet/encodings/delta-length-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -21,11 +21,12 @@
namespace parquet_cpp {
-class DeltaLengthByteArrayDecoder : public Decoder {
+class DeltaLengthByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> {
public:
- DeltaLengthByteArrayDecoder()
- : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
- len_decoder_(parquet::Type::INT32) {
+ explicit DeltaLengthByteArrayDecoder(const parquet::SchemaElement* schema)
+ : Decoder<parquet::Type::BYTE_ARRAY>(
+ schema, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
+ len_decoder_(nullptr) {
}
virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -38,10 +39,10 @@ class DeltaLengthByteArrayDecoder : public Decoder {
len_ = len - 4 - total_lengths_len;
}
- virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ virtual int Decode(ByteArray* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
int lengths[max_values];
- len_decoder_.GetInt32(lengths, max_values);
+ len_decoder_.Decode(lengths, max_values);
for (int i = 0; i < max_values; ++i) {
buffer[i].len = lengths[i];
buffer[i].ptr = data_;
@@ -53,7 +54,8 @@ class DeltaLengthByteArrayDecoder : public Decoder {
}
private:
- DeltaBitPackDecoder len_decoder_;
+ using Decoder<parquet::Type::BYTE_ARRAY>::num_values_;
+ DeltaBitPackDecoder<parquet::Type::INT32> len_decoder_;
const uint8_t* data_;
int len_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 2501b2a..cb8fb30 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -22,56 +22,22 @@
namespace parquet_cpp {
-class DictionaryDecoder : public Decoder {
+template <int TYPE>
+class DictionaryDecoder : public Decoder<TYPE> {
public:
+ typedef typename type_traits<TYPE>::value_type T;
+
// Initializes the dictionary with values from 'dictionary'. The data in dictionary
// is not guaranteed to persist in memory after this call so the dictionary decoder
// needs to copy the data out if necessary.
- DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary)
- : Decoder(type, parquet::Encoding::RLE_DICTIONARY) {
- int num_dictionary_values = dictionary->values_left();
- switch (type) {
- case parquet::Type::BOOLEAN:
- throw ParquetException("Boolean cols should not be dictionary encoded.");
-
- case parquet::Type::INT32:
- int32_dictionary_.resize(num_dictionary_values);
- dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values);
- break;
- case parquet::Type::INT64:
- int64_dictionary_.resize(num_dictionary_values);
- dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values);
- break;
- case parquet::Type::FLOAT:
- float_dictionary_.resize(num_dictionary_values);
- dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values);
- break;
- case parquet::Type::DOUBLE:
- double_dictionary_.resize(num_dictionary_values);
- dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values);
- break;
- case parquet::Type::BYTE_ARRAY: {
- byte_array_dictionary_.resize(num_dictionary_values);
- dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values);
- int total_size = 0;
- for (int i = 0; i < num_dictionary_values; ++i) {
- total_size += byte_array_dictionary_[i].len;
- }
- byte_array_data_.resize(total_size);
- int offset = 0;
- for (int i = 0; i < num_dictionary_values; ++i) {
- memcpy(&byte_array_data_[offset],
- byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len);
- byte_array_dictionary_[i].ptr = &byte_array_data_[offset];
- offset += byte_array_dictionary_[i].len;
- }
- break;
- }
- default:
- ParquetException::NYI("Unsupported dictionary type");
- }
+ DictionaryDecoder(const parquet::SchemaElement* schema, Decoder<TYPE>* dictionary)
+ : Decoder<TYPE>(schema, parquet::Encoding::RLE_DICTIONARY) {
+ Init(dictionary);
}
+ // Perform type-specific initiatialization
+ void Init(Decoder<TYPE>* dictionary);
+
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
if (len == 0) return;
@@ -81,47 +47,17 @@ class DictionaryDecoder : public Decoder {
idx_decoder_ = RleDecoder(data, len, bit_width);
}
- virtual int GetInt32(int32_t* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- buffer[i] = int32_dictionary_[index()];
- }
- return max_values;
- }
-
- virtual int GetInt64(int64_t* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- buffer[i] = int64_dictionary_[index()];
- }
- return max_values;
- }
-
- virtual int GetFloat(float* buffer, int max_values) {
+ virtual int Decode(T* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
for (int i = 0; i < max_values; ++i) {
- buffer[i] = float_dictionary_[index()];
- }
- return max_values;
- }
-
- virtual int GetDouble(double* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- buffer[i] = double_dictionary_[index()];
- }
- return max_values;
- }
-
- virtual int GetByteArray(ByteArray* buffer, int max_values) {
- max_values = std::min(max_values, num_values_);
- for (int i = 0; i < max_values; ++i) {
- buffer[i] = byte_array_dictionary_[index()];
+ buffer[i] = dictionary_[index()];
}
return max_values;
}
private:
+ using Decoder<TYPE>::num_values_;
+
int index() {
int idx = 0;
if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
@@ -130,11 +66,7 @@ class DictionaryDecoder : public Decoder {
}
// Only one is set.
- std::vector<int32_t> int32_dictionary_;
- std::vector<int64_t> int64_dictionary_;
- std::vector<float> float_dictionary_;
- std::vector<double> double_dictionary_;
- std::vector<ByteArray> byte_array_dictionary_;
+ std::vector<T> dictionary_;
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
@@ -143,6 +75,39 @@ class DictionaryDecoder : public Decoder {
RleDecoder idx_decoder_;
};
+template <int TYPE>
+inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) {
+ int num_dictionary_values = dictionary->values_left();
+ dictionary_.resize(num_dictionary_values);
+ dictionary->Decode(&dictionary_[0], num_dictionary_values);
+}
+
+template <>
+inline void DictionaryDecoder<parquet::Type::BOOLEAN>::Init(
+ Decoder<parquet::Type::BOOLEAN>* dictionary) {
+ ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
+}
+
+template <>
+inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init(
+ Decoder<parquet::Type::BYTE_ARRAY>* dictionary) {
+ int num_dictionary_values = dictionary->values_left();
+ dictionary_.resize(num_dictionary_values);
+ dictionary->Decode(&dictionary_[0], num_dictionary_values);
+
+ int total_size = 0;
+ for (int i = 0; i < num_dictionary_values; ++i) {
+ total_size += dictionary_[i].len;
+ }
+ byte_array_data_.resize(total_size);
+ int offset = 0;
+ for (int i = 0; i < num_dictionary_values; ++i) {
+ memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
+ dictionary_[i].ptr = &byte_array_data_[offset];
+ offset += dictionary_[i].len;
+ }
+}
+
} // namespace parquet_cpp
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index 9211bf8..2017fca 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -17,6 +17,8 @@
#include <cstdint>
+#include "parquet/types.h"
+
#include "parquet/thrift/parquet_constants.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/util/rle-encoding.h"
@@ -24,8 +26,12 @@
namespace parquet_cpp {
+// The Decoder template is parameterized on parquet::Type::type
+template <int TYPE>
class Decoder {
public:
+ typedef typename type_traits<TYPE>::value_type T;
+
virtual ~Decoder() {}
// Sets the data for a new page. This will be called multiple times on the same
@@ -36,22 +42,7 @@ class Decoder {
// the decoder would decode put to 'max_values', storing the result in 'buffer'.
// The function returns the number of values decoded, which should be max_values
// except for end of the current data page.
- virtual int GetBool(bool* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
- virtual int GetInt32(int32_t* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
- virtual int GetInt64(int64_t* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
- virtual int GetFloat(float* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
- virtual int GetDouble(double* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
- virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ virtual int Decode(T* buffer, int max_values) {
throw ParquetException("Decoder does not implement this type.");
}
@@ -62,19 +53,22 @@ class Decoder {
const parquet::Encoding::type encoding() const { return encoding_; }
protected:
- Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding)
- : type_(type), encoding_(encoding), num_values_(0) {}
+ explicit Decoder(const parquet::SchemaElement* schema,
+ const parquet::Encoding::type& encoding)
+ : schema_(schema), encoding_(encoding), num_values_(0) {}
+
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const parquet::SchemaElement* schema_;
- const parquet::Type::type type_;
const parquet::Encoding::type encoding_;
int num_values_;
};
} // namespace parquet_cpp
-#include "parquet/encodings/bool-encoding.h"
#include "parquet/encodings/plain-encoding.h"
#include "parquet/encodings/dictionary-encoding.h"
+
#include "parquet/encodings/delta-bit-pack-encoding.h"
#include "parquet/encodings/delta-length-byte-array-encoding.h"
#include "parquet/encodings/delta-byte-array-encoding.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index b094cdb..5fb460e 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -21,11 +21,15 @@
namespace parquet_cpp {
-class PlainDecoder : public Decoder {
+template <int TYPE>
+class PlainDecoder : public Decoder<TYPE> {
public:
- explicit PlainDecoder(const parquet::Type::type& type)
- : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) {
- }
+ typedef typename type_traits<TYPE>::value_type T;
+ using Decoder<TYPE>::num_values_;
+
+ explicit PlainDecoder(const parquet::SchemaElement* schema) :
+ Decoder<TYPE>(schema, parquet::Encoding::PLAIN),
+ data_(NULL), len_(0) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
@@ -33,49 +37,61 @@ class PlainDecoder : public Decoder {
len_ = len;
}
- int GetValues(void* buffer, int max_values, int byte_size) {
- max_values = std::min(max_values, num_values_);
- int size = max_values * byte_size;
- if (len_ < size) ParquetException::EofException();
- memcpy(buffer, data_, size);
- data_ += size;
- len_ -= size;
- num_values_ -= max_values;
- return max_values;
- }
+ virtual int Decode(T* buffer, int max_values);
+ private:
+ const uint8_t* data_;
+ int len_;
+};
- virtual int GetInt32(int32_t* buffer, int max_values) {
- return GetValues(buffer, max_values, sizeof(int32_t));
- }
+template <int TYPE>
+inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
+ max_values = std::min(max_values, num_values_);
+ int size = max_values * sizeof(T);
+ if (len_ < size) ParquetException::EofException();
+ memcpy(buffer, data_, size);
+ data_ += size;
+ len_ -= size;
+ num_values_ -= max_values;
+ return max_values;
+}
- virtual int GetInt64(int64_t* buffer, int max_values) {
- return GetValues(buffer, max_values, sizeof(int64_t));
+// Template specialization for BYTE_ARRAY
+template <>
+inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
+ int max_values) {
+ max_values = std::min(max_values, num_values_);
+ for (int i = 0; i < max_values; ++i) {
+ buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
+ if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
+ buffer[i].ptr = data_ + sizeof(uint32_t);
+ data_ += sizeof(uint32_t) + buffer[i].len;
+ len_ -= sizeof(uint32_t) + buffer[i].len;
}
+ num_values_ -= max_values;
+ return max_values;
+}
- virtual int GetFloat(float* buffer, int max_values) {
- return GetValues(buffer, max_values, sizeof(float));
- }
+template <>
+class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
+ public:
+ explicit PlainDecoder(const parquet::SchemaElement* schema) :
+ Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
- virtual int GetDouble(double* buffer, int max_values) {
- return GetValues(buffer, max_values, sizeof(double));
+ virtual void SetData(int num_values, const uint8_t* data, int len) {
+ num_values_ = num_values;
+ decoder_ = RleDecoder(data, len, 1);
}
- virtual int GetByteArray(ByteArray* buffer, int max_values) {
+ virtual int Decode(bool* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
for (int i = 0; i < max_values; ++i) {
- buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
- if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
- buffer[i].ptr = data_ + sizeof(uint32_t);
- data_ += sizeof(uint32_t) + buffer[i].len;
- len_ -= sizeof(uint32_t) + buffer[i].len;
+ if (!decoder_.Get(&buffer[i])) ParquetException::EofException();
}
num_values_ -= max_values;
return max_values;
}
-
private:
- const uint8_t* data_;
- int len_;
+ RleDecoder decoder_;
};
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 4469a82..0fd3e97 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -27,199 +27,8 @@
#include <vector>
#include "parquet/exception.h"
-#include "parquet/thrift/parquet_constants.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/util/rle-encoding.h"
-
-namespace std {
-
-template <>
-struct hash<parquet::Encoding::type> {
- std::size_t operator()(const parquet::Encoding::type& k) const {
- return hash<int>()(static_cast<int>(k));
- }
-};
-
-} // namespace std
-
-namespace parquet_cpp {
-
-class Codec;
-class Decoder;
-
-struct ByteArray {
- uint32_t len;
- const uint8_t* ptr;
-};
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
- // Returns the next 'num_to_peek' without advancing the current position.
- // *num_bytes will contain the number of bytes returned which can only be
- // less than num_to_peek at end of stream cases.
- // Since the position is not advanced, calls to this function are idempotent.
- // The buffer returned to the caller is still owned by the input stream and must
- // stay valid until the next call to Peek() or Read().
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
-
- // Identical to Peek(), except the current position in the stream is advanced by
- // *num_bytes.
- virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
-
- virtual ~InputStream() {}
-
- protected:
- InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
- InMemoryInputStream(const uint8_t* buffer, int64_t len);
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
- virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
- const uint8_t* buffer_;
- int64_t len_;
- int64_t offset_;
-};
-
-// A wrapper for InMemoryInputStream to manage the memory.
-class ScopedInMemoryInputStream : public InputStream {
- public:
- ScopedInMemoryInputStream(int64_t len);
- uint8_t* data();
- int64_t size();
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
- virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
- std::vector<uint8_t> buffer_;
- std::unique_ptr<InMemoryInputStream> stream_;
-};
-
-// API to read values from a single column. This is the main client facing API.
-class ColumnReader {
- public:
- struct Config {
- int batch_size;
-
- static Config DefaultConfig() {
- Config config;
- config.batch_size = 128;
- return config;
- }
- };
-
- ColumnReader(const parquet::ColumnMetaData*,
- const parquet::SchemaElement*, InputStream* stream);
-
- ~ColumnReader();
-
- // Returns true if there are still values in this column.
- bool HasNext();
-
- // Returns the next value of this type.
- // TODO: batchify this interface.
- bool GetBool(int* definition_level, int* repetition_level);
- int32_t GetInt32(int* definition_level, int* repetition_level);
- int64_t GetInt64(int* definition_level, int* repetition_level);
- float GetFloat(int* definition_level, int* repetition_level);
- double GetDouble(int* definition_level, int* repetition_level);
- ByteArray GetByteArray(int* definition_level, int* repetition_level);
-
- private:
- bool ReadNewPage();
- // Reads the next definition and repetition level. Returns true if the value is NULL.
- bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
-
- void BatchDecode();
-
- Config config_;
-
- const parquet::ColumnMetaData* metadata_;
- const parquet::SchemaElement* schema_;
- InputStream* stream_;
-
- // Compression codec to use.
- std::unique_ptr<Codec> decompressor_;
- std::vector<uint8_t> decompression_buffer_;
-
- // Map of compression type to decompressor object.
- std::unordered_map<parquet::Encoding::type, std::shared_ptr<Decoder> > decoders_;
-
- parquet::PageHeader current_page_header_;
-
- // Not set if field is required.
- std::unique_ptr<RleDecoder> definition_level_decoder_;
- // Not set for flat schemas.
- std::unique_ptr<RleDecoder> repetition_level_decoder_;
- Decoder* current_decoder_;
- int num_buffered_values_;
-
- std::vector<uint8_t> values_buffer_;
- int num_decoded_values_;
- int buffered_values_offset_;
-};
-
-
-inline bool ColumnReader::HasNext() {
- if (num_buffered_values_ == 0) {
- ReadNewPage();
- if (num_buffered_values_ == 0) return false;
- }
- return true;
-}
-
-inline bool ColumnReader::GetBool(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return bool();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<bool*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline int32_t ColumnReader::GetInt32(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int32_t();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<int32_t*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline int64_t ColumnReader::GetInt64(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int64_t();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<int64_t*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline float ColumnReader::GetFloat(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return float();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<float*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline double ColumnReader::GetDouble(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return double();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<double*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline ByteArray ColumnReader::GetByteArray(int* def_level, int* rep_level) {
- if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return ByteArray();
- if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
- return reinterpret_cast<ByteArray*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
- *rep_level = 1;
- if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
- ParquetException::EofException();
- }
- --num_buffered_values_;
- return *def_level == 0;
-}
-
-} // namespace parquet_cpp
+#include "parquet/reader.h"
+#include "parquet/column_reader.h"
+#include "parquet/util/input_stream.h"
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 0f06f3f..1459afc 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -42,9 +42,7 @@ class TestAllTypesPlain : public ::testing::Test {
reader_.Open(&file_);
}
- void TearDown() {
- reader_.Close();
- }
+ void TearDown() {}
protected:
LocalFile file_;
@@ -56,4 +54,14 @@ TEST_F(TestAllTypesPlain, ParseMetaData) {
reader_.ParseMetaData();
}
+TEST_F(TestAllTypesPlain, DebugPrintWorks) {
+ std::stringstream ss;
+
+ // Automatically parses metadata
+ reader_.DebugPrint(ss);
+
+ std::string result = ss.str();
+ ASSERT_TRUE(result.size() > 0);
+}
+
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 7ccd98c..7c727ba 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -18,18 +18,30 @@
#include "parquet/reader.h"
#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <sstream>
+#include <string>
#include <vector>
+#include "parquet/column_reader.h"
#include "parquet/exception.h"
+
#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+using std::string;
+using std::vector;
+using parquet::Type;
+
namespace parquet_cpp {
// ----------------------------------------------------------------------
// LocalFile methods
LocalFile::~LocalFile() {
- // You must explicitly call Close
+ CloseFile();
}
void LocalFile::Open(const std::string& path) {
@@ -39,6 +51,11 @@ void LocalFile::Open(const std::string& path) {
}
void LocalFile::Close() {
+ // Pure virtual
+ CloseFile();
+}
+
+void LocalFile::CloseFile() {
if (is_open_) {
fclose(file_);
is_open_ = false;
@@ -58,9 +75,51 @@ size_t LocalFile::Tell() {
return ftell(file_);
}
-void LocalFile::Read(size_t nbytes, uint8_t* buffer,
- size_t* bytes_read) {
- *bytes_read = fread(buffer, 1, nbytes, file_);
+size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) {
+ return fread(buffer, 1, nbytes, file_);
+}
+
+// ----------------------------------------------------------------------
+// RowGroupReader
+
+ColumnReader* RowGroupReader::Column(size_t i) {
+ // TODO: boundschecking
+ auto it = column_readers_.find(i);
+ if (it != column_readers_.end()) {
+ // Already have constructed the ColumnReader
+ return it->second.get();
+ }
+
+ const parquet::ColumnChunk& col = row_group_->columns[i];
+
+ size_t col_start = col.meta_data.data_page_offset;
+ if (col.meta_data.__isset.dictionary_page_offset &&
+ col_start > col.meta_data.dictionary_page_offset) {
+ col_start = col.meta_data.dictionary_page_offset;
+ }
+
+ std::unique_ptr<ScopedInMemoryInputStream> input(
+ new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
+
+ FileLike* source = this->parent_->buffer_;
+
+ source->Seek(col_start);
+
+ // TODO(wesm): Law of demeter violation
+ size_t bytes_read = source->Read(input->size(), input->data());
+
+ if (bytes_read != input->size()) {
+ std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl;
+ std::cout << "Bytes read: " << bytes_read << std::endl;
+ throw ParquetException("Unable to read column chunk data");
+ }
+
+ // TODO(wesm): This presumes a flat schema
+ std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data,
+ &this->parent_->metadata_.schema[i + 1], input.release());
+ column_readers_[i] = reader;
+
+ return reader.get();
}
// ----------------------------------------------------------------------
@@ -70,6 +129,12 @@ void LocalFile::Read(size_t nbytes, uint8_t* buffer,
static constexpr uint32_t FOOTER_SIZE = 8;
static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+ParquetFileReader::ParquetFileReader() :
+ parsed_metadata_(false),
+ buffer_(nullptr) {}
+
+ParquetFileReader::~ParquetFileReader() {}
+
void ParquetFileReader::Open(FileLike* buffer) {
buffer_ = buffer;
}
@@ -78,6 +143,29 @@ void ParquetFileReader::Close() {
buffer_->Close();
}
+RowGroupReader* ParquetFileReader::RowGroup(size_t i) {
+ if (i >= num_row_groups()) {
+ std::stringstream ss;
+ ss << "The file only has " << num_row_groups()
+ << "row groups, requested reader for: "
+ << i;
+ throw ParquetException(ss.str());
+ }
+
+ auto it = row_group_readers_.find(i);
+ if (it != row_group_readers_.end()) {
+ // Constructed the RowGroupReader already
+ return it->second.get();
+ }
+ if (!parsed_metadata_) {
+ ParseMetaData();
+ }
+
+ // Construct the RowGroupReader
+ row_group_readers_[i] = std::make_shared<RowGroupReader>(this, &metadata_.row_groups[i]);
+ return row_group_readers_[i].get();
+}
+
void ParquetFileReader::ParseMetaData() {
size_t filesize = buffer_->Size();
@@ -85,11 +173,11 @@ void ParquetFileReader::ParseMetaData() {
throw ParquetException("Corrupted file, smaller than file footer");
}
- size_t bytes_read;
uint8_t footer_buffer[FOOTER_SIZE];
buffer_->Seek(filesize - FOOTER_SIZE);
- buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read);
+
+ size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer);
if (bytes_read != FOOTER_SIZE) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
@@ -107,11 +195,192 @@ void ParquetFileReader::ParseMetaData() {
buffer_->Seek(metadata_start);
std::vector<uint8_t> metadata_buffer(metadata_len);
- buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read);
+ bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]);
if (bytes_read != metadata_len) {
throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
}
DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
+ parsed_metadata_ = true;
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader::DebugPrint
+
+static string parquet_type_to_string(Type::type t) {
+ switch (t) {
+ case Type::BOOLEAN:
+ return "BOOLEAN";
+ break;
+ case Type::INT32:
+ return "INT32";
+ break;
+ case Type::INT64:
+ return "INT64";
+ break;
+ case Type::FLOAT:
+ return "FLOAT";
+ break;
+ case Type::DOUBLE:
+ return "DOUBLE";
+ break;
+ case Type::BYTE_ARRAY:
+ return "BYTE_ARRAY";
+ break;
+ case Type::INT96:
+ return "INT96";
+ break;
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return "FIXED_LEN_BYTE_ARRAY";
+ break;
+ default:
+ return "UNKNOWN";
+ break;
+ }
+}
+
+// the fixed initial size is just for an example
+#define COL_WIDTH "17"
+
+void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
+ if (!parsed_metadata_) {
+ ParseMetaData();
+ }
+
+ stream << "File statistics:\n";
+ stream << "Total rows: " << metadata_.num_rows << "\n";
+ for (int c = 1; c < metadata_.schema.size(); ++c) {
+ stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " ("
+ << parquet_type_to_string(metadata_.schema[c].type);
+ if (metadata_.schema[c].type == Type::INT96 ||
+ metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
+ stream << " - not supported";
+ }
+ stream << ")\n";
+ }
+
+ for (int i = 0; i < metadata_.row_groups.size(); ++i) {
+ stream << "--- Row Group " << i << " ---\n";
+
+ RowGroupReader* group_reader = RowGroup(i);
+
+ // Print column metadata
+ size_t nColumns = group_reader->num_columns();
+
+ for (int c = 0; c < group_reader->num_columns(); ++c) {
+ const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata();
+ stream << "Column " << c
+ << ": " << meta_data->num_values << " rows, "
+ << meta_data->statistics.null_count << " null values, "
+ << meta_data->statistics.distinct_count << " distinct values, "
+ << "min value: " << (meta_data->statistics.min.length()>0 ?
+ meta_data->statistics.min : "N/A")
+ << ", max value: " << (meta_data->statistics.max.length()>0 ?
+ meta_data->statistics.max : "N/A") << ".\n";
+ }
+
+ if (!print_values) {
+ continue;
+ }
+
+ // Create readers for all columns and print contents
+ vector<ColumnReader*> readers(nColumns, NULL);
+ for (int c = 0; c < nColumns; ++c) {
+ ColumnReader* col_reader = group_reader->Column(c);
+
+ Type::type col_type = col_reader->type();
+
+ printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());
+
+ if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) {
+ continue;
+ }
+
+ // This is OK in this method as long as the RowGroupReader does not get deleted
+ readers[c] = col_reader;
+ }
+ stream << "\n";
+
+ vector<int> def_level(nColumns, 0);
+ vector<int> rep_level(nColumns, 0);
+
+ static constexpr size_t bufsize = 25;
+ char buffer[bufsize];
+
+ bool hasRow;
+ do {
+ hasRow = false;
+ for (int c = 0; c < nColumns; ++c) {
+ if (readers[c] == NULL) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"s", " ");
+ stream << buffer;
+ continue;
+ }
+ if (readers[c]->HasNext()) {
+ hasRow = true;
+ switch (readers[c]->type()) {
+ case Type::BOOLEAN: {
+ bool val = reinterpret_cast<BoolReader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
+ stream << buffer;
+ }
+ break;
+ }
+ case Type::INT32: {
+ int32_t val = reinterpret_cast<Int32Reader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
+ stream << buffer;
+ }
+ break;
+ }
+ case Type::INT64: {
+ int64_t val = reinterpret_cast<Int64Reader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"ld",val);
+ stream << buffer;
+ }
+ break;
+ }
+ case Type::FLOAT: {
+ float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"f",val);
+ stream << buffer;
+ }
+ break;
+ }
+ case Type::DOUBLE: {
+ double val = reinterpret_cast<DoubleReader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"lf",val);
+ stream << buffer;
+ }
+ break;
+ }
+ case Type::BYTE_ARRAY: {
+ ByteArray val = reinterpret_cast<ByteArrayReader*>(readers[c])->NextValue(
+ &def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ string result = ByteArrayToString(val);
+ snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
+ stream << buffer;
+ }
+ break;
+ }
+ default:
+ continue;
+ }
+ }
+ }
+ stream << "\n";
+ } while (hasRow);
+ }
}
} // namespace parquet_cpp