You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/01/27 01:53:20 UTC
parquet-cpp git commit: PARQUET-418: Refactored parquet_reader
utility for printing file contents.
Repository: parquet-cpp
Updated Branches:
refs/heads/master 9a1fd892f -> 8fc24f861
PARQUET-418: Refactored parquet_reader utility for printing file contents.
This pull request contains the following changes:
* Modified parquet_reader utility: refactored, fixed memory leaks, merged compute_stats utility to get rid of code duplication.
* Added a flag --only-stats to parquet_reader to print only file statistics.
* Modified InMemoryInputStream to own its buffer.
All the code repetition still remaining in parquet_reader clearly highlights the need for specialized ColumnReader classes. I will create a new JIRA for this improvement.
Author: Aliaksei Sandryhaila <al...@hp.com>
Closes #18 from asandryh/PARQUET-418 and squashes the following commits:
a378a1e [Aliaksei Sandryhaila] Changed the buffer in ScopedInMemoryInputStream to std::vector.
7f6f533 [Aliaksei Sandryhaila] [PARQUET-418]: Added/modified a utility for printing a file contents.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8fc24f86
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8fc24f86
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8fc24f86
Branch: refs/heads/master
Commit: 8fc24f861a9768b3be9d9c2eb0eeefb114164008
Parents: 9a1fd89
Author: Aliaksei Sandryhaila <al...@hp.com>
Authored: Tue Jan 26 16:53:17 2016 -0800
Committer: Nong Li <no...@gmail.com>
Committed: Tue Jan 26 16:53:17 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 2 +-
example/CMakeLists.txt | 3 -
example/compute_stats.cc | 222 ----------------------
example/example_util.h | 23 ++-
example/parquet_reader.cc | 419 +++++++++++++++++++----------------------
src/parquet.cc | 28 ++-
src/parquet/parquet.h | 14 ++
7 files changed, 259 insertions(+), 452 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2554e6c..a2f7e6a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -133,7 +133,7 @@ message(STATUS "Boost libraries: " ${Boost_LIBRARIES})
# find thrift headers and libs
find_package(Thrift REQUIRED)
-include_directories(SYSTEM ${THRIFT_INCLUDE_DIR})
+include_directories(SYSTEM ${THRIFT_INCLUDE_DIR} ${THRIFT_INCLUDE_DIR}/thrift)
set(LIBS ${LIBS} ${THRIFT_LIBS})
message(STATUS "Thrift include dir: ${THRIFT_INCLUDE_DIR}")
message(STATUS "Thrift contrib dir: ${THRIFT_CONTRIB_DIR}")
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt
index a9f4fa3..a020184 100644
--- a/example/CMakeLists.txt
+++ b/example/CMakeLists.txt
@@ -23,9 +23,6 @@ SET(LINK_LIBS
thriftstatic
Example)
-add_executable(compute_stats compute_stats.cc)
-target_link_libraries(compute_stats ${LINK_LIBS})
-
add_executable(decode_benchmark decode_benchmark.cc)
target_link_libraries(decode_benchmark ${LINK_LIBS})
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/compute_stats.cc
----------------------------------------------------------------------
diff --git a/example/compute_stats.cc b/example/compute_stats.cc
deleted file mode 100644
index 82a574f..0000000
--- a/example/compute_stats.cc
+++ /dev/null
@@ -1,222 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include <parquet/parquet.h>
-#include <iostream>
-#include <stdio.h>
-
-#include "example_util.h"
-
-using namespace parquet;
-using namespace parquet_cpp;
-using namespace std;
-
-struct AnyType {
- union {
- bool bool_val;
- int32_t int32_val;
- int64_t int64_val;
- float float_val;
- double double_val;
- ByteArray byte_array_val;
- };
-};
-
-static string ByteArrayToString(const ByteArray& a) {
- return string(reinterpret_cast<const char*>(a.ptr), a.len);
-}
-
-int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
- int len = ::min(x1.len, x2.len);
- int cmp = memcmp(x1.ptr, x2.ptr, len);
- if (cmp != 0) return cmp;
- if (len < x1.len) return 1;
- if (len < x2.len) return -1;
- return 0;
-}
-
-// Simple example which reads all the values in the file and outputs the number of
-// values, number of nulls and min/max for each column.
-int main(int argc, char** argv) {
- int col_idx = -1;
- if (argc < 2) {
- cerr << "Usage: compute_stats <file> [col_idx]" << endl;
- return -1;
- }
- if (argc == 3) col_idx = atoi(argv[2]);
- FileMetaData metadata;
- if (!GetFileMetadata(argv[1], &metadata)) return -1;
-
- FILE* file = fopen(argv[1], "r");
- if (file == NULL) {
- cerr << "Could not open file: " << argv[1] << endl;
- return -1;
- }
-
- for (int i = 0; i < metadata.row_groups.size(); ++i) {
- const RowGroup& row_group = metadata.row_groups[i];
- for (int c = 0; c < row_group.columns.size(); ++c) {
- if (col_idx != -1 && col_idx != c) continue;
- const ColumnChunk& col = row_group.columns[c];
- cout << "Reading column " << metadata.schema[c + 1].name << " (idx=" << c << ")\n";
- if (col.meta_data.type == Type::INT96) {
- cout << " Skipping unsupported column" << endl;
- continue;
- }
-
- size_t col_start = col.meta_data.data_page_offset;
- if (col.meta_data.__isset.dictionary_page_offset) {
- if (col_start > col.meta_data.dictionary_page_offset) {
- col_start = col.meta_data.dictionary_page_offset;
- }
- }
- fseek(file, col_start, SEEK_SET);
- vector<uint8_t> column_buffer;
- column_buffer.resize(col.meta_data.total_compressed_size);
- size_t num_read = fread(&column_buffer[0], 1, column_buffer.size(), file);
- if (num_read != column_buffer.size()) {
- cerr << "Could not read column data." << endl;
- continue;
- }
-
- InMemoryInputStream input(&column_buffer[0], column_buffer.size());
- ColumnReader reader(&col.meta_data, &metadata.schema[c + 1], &input);
-
- bool first_val = true;
- AnyType min, max;
- int num_values = 0;
- int num_nulls = 0;
-
- int def_level, rep_level;
- while (reader.HasNext()) {
- switch (col.meta_data.type) {
- case Type::BOOLEAN: {
- bool val = reader.GetBool(&def_level, &rep_level);
- if (def_level < rep_level) break;
- if (first_val) {
- min.bool_val = max.bool_val = val;
- first_val = false;
- } else {
- min.bool_val = ::min(val, min.bool_val);
- max.bool_val = ::max(val, max.bool_val);
- }
- break;
- }
- case Type::INT32: {
- int32_t val = reader.GetInt32(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- if (first_val) {
- min.int32_val = max.int32_val = val;
- first_val = false;
- } else {
- min.int32_val = ::min(val, min.int32_val);
- max.int32_val = ::max(val, max.int32_val);
- }
- break;
- }
- case Type::INT64: {
- int64_t val = reader.GetInt64(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- if (first_val) {
- min.int64_val = max.int64_val = val;
- first_val = false;
- } else {
- min.int64_val = ::min(val, min.int64_val);
- max.int64_val = ::max(val, max.int64_val);
- }
- break;
- }
- case Type::FLOAT: {
- float val = reader.GetFloat(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- if (first_val) {
- min.float_val = max.float_val = val;
- first_val = false;
- } else {
- min.float_val = ::min(val, min.float_val);
- max.float_val = ::max(val, max.float_val);
- }
- break;
- }
- case Type::DOUBLE: {
- double val = reader.GetDouble(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- if (first_val) {
- min.double_val = max.double_val = val;
- first_val = false;
- } else {
- min.double_val = ::min(val, min.double_val);
- max.double_val = ::max(val, max.double_val);
- }
- break;
- }
- case Type::BYTE_ARRAY: {
- ByteArray val = reader.GetByteArray(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- if (first_val) {
- min.byte_array_val = max.byte_array_val = val;
- first_val = false;
- } else {
- if (ByteCompare(val, min.byte_array_val) < 0) {
- min.byte_array_val = val;
- }
- if (ByteCompare(val, max.byte_array_val) > 0) {
- max.byte_array_val = val;
- }
- }
- break;
- }
- default:
- continue;
- }
-
- if (def_level < rep_level) ++num_nulls;
- ++num_values;
- }
-
- cout << " Num Values: " << num_values << endl;
- cout << " Num Nulls: " << num_nulls << endl;
- switch (col.meta_data.type) {
- case Type::BOOLEAN:
- cout << " Min: " << min.bool_val << endl;
- cout << " Max: " << max.bool_val << endl;
- break;
- case Type::INT32:
- cout << " Min: " << min.int32_val << endl;
- cout << " Max: " << max.int32_val << endl;
- break;
- case Type::INT64:
- cout << " Min: " << min.int64_val << endl;
- cout << " Max: " << max.int64_val << endl;
- break;
- case Type::FLOAT:
- cout << " Min: " << min.float_val << endl;
- cout << " Max: " << max.float_val << endl;
- break;
- case Type::DOUBLE:
- cout << " Min: " << min.double_val << endl;
- cout << " Max: " << max.double_val << endl;
- break;
- case Type::BYTE_ARRAY:
- cout << " Min: " << ByteArrayToString(min.byte_array_val) << endl;
- cout << " Max: " << ByteArrayToString(max.byte_array_val) << endl;
- break;
- default:
- continue;
- }
- }
- }
- fclose(file);
- return 0;
-}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/example_util.h
----------------------------------------------------------------------
diff --git a/example/example_util.h b/example/example_util.h
index 4c523e7..a8b58fc 100644
--- a/example/example_util.h
+++ b/example/example_util.h
@@ -17,7 +17,28 @@
#include <string>
#include <parquet/parquet.h>
+#include <stdio.h>
bool GetFileMetadata(const std::string& path, parquet::FileMetaData* metadata);
-#endif
+class InputFile {
+private:
+ FILE* file;
+ std::string filename;
+
+public:
+ InputFile(const std::string& _filename): filename(_filename) {
+ file = fopen(_filename.c_str(), "r");
+ }
+ ~InputFile() {
+ if (file != NULL) {
+ fclose(file);
+ }
+ }
+
+ FILE* getFileHandle() { return file; }
+ bool isOpen() { return file != NULL; }
+ std::string getFilename() { return filename; }
+};
+
+#endif // PARQUET_EXAMPLE_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 55895ce..5379c5e 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -13,14 +13,12 @@
// limitations under the License.
#include <parquet/parquet.h>
-#include <iostream>
-#include <stdio.h>
-
#include "example_util.h"
+#include <iostream>
+
// the fixed initial size is just for an example
-#define INIT_SIZE 100
-#define COL_WIDTH "17"
+#define COL_WIDTH "17"
using namespace parquet;
using namespace parquet_cpp;
@@ -41,254 +39,229 @@ static string ByteArrayToString(const ByteArray& a) {
return string(reinterpret_cast<const char*>(a.ptr), a.len);
}
-void* read_parquet(char* filename);
-
-// Simple example which prints out the content of the Parquet file
-int main(int argc, char** argv) {
-
- if (argc < 2) {
- cerr << "Usage: parquet_reader <file>" << endl;
- return -1;
- }
-
- void *column_ptr = read_parquet(argv[1]);
-
- // an example to use the returned column_ptr
- // printf("%-"COL_WIDTH"d\n",((int32_t *)(((int32_t **)column_ptr)[0]))[0]);
-
+int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
+ int len = ::min(x1.len, x2.len);
+ int cmp = memcmp(x1.ptr, x2.ptr, len);
+ if (cmp != 0) return cmp;
+ if (len < x1.len) return 1;
+ if (len < x2.len) return -1;
return 0;
}
+string type2String(Type::type t) {
+ switch(t) {
+ case Type::BOOLEAN:
+ return "BOOLEAN";
+ break;
+ case Type::INT32:
+ return "INT32";
+ break;
+ case Type::INT64:
+ return "INT64";
+ break;
+ case Type::FLOAT:
+ return "FLOAT";
+ break;
+ case Type::DOUBLE:
+ return "DOUBLE";
+ break;
+ case Type::BYTE_ARRAY:
+ return "BYTE_ARRAY";
+ break;
+ case Type::INT96:
+ return "INT96";
+ break;
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return "FIXED_LEN_BYTE_ARRAY";
+ break;
+ default:
+ return "UNKNOWN";
+ break;
+ }
+}
-void* read_parquet(char* filename) {
-
- unsigned int total_row_number = 0;
+void readParquet(const string& filename, const bool printValues) {
+ InputFile file(filename);
+ if (!file.isOpen()) {
+ cerr << "Could not open file " << file.getFilename() << endl;
+ return;
+ }
FileMetaData metadata;
- if (!GetFileMetadata(filename, &metadata)) return NULL;
+ if (!GetFileMetadata(file.getFilename().c_str(), &metadata)) {
+ cerr << "Could not read metadata from file " << file.getFilename() << endl;
+ return;
+ }
- FILE* file = fopen(filename, "r");
- if (file == NULL) {
- cerr << "Could not open file: " << filename << endl;
- return NULL;
+ cout << "File statistics:\n" ;
+ cout << "Total rows: " << metadata.num_rows << "\n";
+ for (int c = 1; c < metadata.schema.size(); ++c) {
+ cout << "Column " << c-1 << ": " << metadata.schema[c].name << " ("
+ << type2String(metadata.schema[c].type);
+ if (metadata.schema[c].type == Type::INT96 ||
+ metadata.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
+ cout << " - not supported";
+ }
+ cout << ")\n";
}
for (int i = 0; i < metadata.row_groups.size(); ++i) {
- const RowGroup& row_group = metadata.row_groups[i];
+ cout << "--- Row Group " << i << " ---\n";
- Type::type* type_array = (Type::type*)malloc(
- row_group.columns.size() * sizeof(Type::type));
- assert(type_array);
+ // Print column metadata
+ const RowGroup& row_group = metadata.row_groups[i];
+ size_t nColumns = row_group.columns.size();
+
+ for (int c = 0; c < nColumns; ++c) {
+ const ColumnMetaData& meta_data = row_group.columns[c].meta_data;
+ cout << "Column " << c
+ << ": " << meta_data.num_values << " rows, "
+ << meta_data.statistics.null_count << " null values, "
+ << meta_data.statistics.distinct_count << " distinct values, "
+ << "min value: " << (meta_data.statistics.min.length()>0 ?
+ meta_data.statistics.min : "N/A")
+ << ", max value: " << (meta_data.statistics.max.length()>0 ?
+ meta_data.statistics.max : "N/A") << ".\n";
+ }
- void* column_ptr = (void*)malloc(row_group.columns.size() * sizeof(void*));
- assert(column_ptr);
+ if (!printValues) {
+ continue;
+ }
- for (int c = 0; c < row_group.columns.size(); ++c) {
+ // Create readers for all columns and print contents
+ vector<ColumnReader*> readers(nColumns, NULL);
+ try {
+ for (int c = 0; c < nColumns; ++c) {
+ const ColumnChunk& col = row_group.columns[c];
+ printf("%-" COL_WIDTH"s", metadata.schema[c+1].name.c_str());
- const ColumnChunk& col = row_group.columns[c];
- if (col.meta_data.type == Type::INT96 ||
- col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) {
- cout << " Skipping unsupported column" << endl;
- continue;
- }
-
- size_t col_start = col.meta_data.data_page_offset;
- if (col.meta_data.__isset.dictionary_page_offset) {
- if (col_start > col.meta_data.dictionary_page_offset) {
- col_start = col.meta_data.dictionary_page_offset;
+ if (col.meta_data.type == Type::INT96 ||
+ col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) {
+ continue;
}
- }
- fseek(file, col_start, SEEK_SET);
- vector<uint8_t> column_buffer;
- column_buffer.resize(col.meta_data.total_compressed_size);
- size_t num_read = fread(&column_buffer[0], 1, column_buffer.size(), file);
- if (num_read != column_buffer.size()) {
- cerr << "Could not read column data." << endl;
- continue;
- }
-
- InMemoryInputStream input(&column_buffer[0], column_buffer.size());
- ColumnReader reader(&col.meta_data, &metadata.schema[c + 1], &input);
- AnyType min, max;
- int num_values = 0;
- int num_nulls = 0;
-
- switch (col.meta_data.type) {
- case Type::BOOLEAN: {
- ((bool**)column_ptr)[c] = (bool*)malloc(sizeof(bool) * INIT_SIZE);
- type_array[c] = Type::BOOLEAN;
- break;
- }
- case Type::INT32: {
- ((int32_t**)column_ptr)[c] = (int32_t*)malloc(sizeof(int32_t) * INIT_SIZE);
- type_array[c] = Type::INT32;
- break;
- }
- case Type::INT64: {
- ((int64_t**)column_ptr)[c] = (int64_t*)malloc(sizeof(int64_t) * INIT_SIZE);
- type_array[c] = Type::INT64;
- break;
+ size_t col_start = col.meta_data.data_page_offset;
+ if (col.meta_data.__isset.dictionary_page_offset &&
+ col_start > col.meta_data.dictionary_page_offset) {
+ col_start = col.meta_data.dictionary_page_offset;
}
- case Type::FLOAT: {
- ((float**)column_ptr)[c] = (float*)malloc(sizeof(float) * INIT_SIZE);
- type_array[c] = Type::FLOAT;
- break;
- }
- case Type::DOUBLE: {
- ((double**)column_ptr)[c] = (double*)malloc(sizeof(double) * INIT_SIZE);
- type_array[c] = Type::DOUBLE;
- break;
- }
- case Type::BYTE_ARRAY: {
- ((ByteArray**)column_ptr)[c] =
- (ByteArray*)malloc(sizeof(ByteArray) * INIT_SIZE);
- type_array[c] = Type::BYTE_ARRAY;
- break;
- }
- case Type::FIXED_LEN_BYTE_ARRAY:
- case Type::INT96:
- assert(false);
- break;
+ std::unique_ptr<ScopedInMemoryInputStream> input(
+ new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
+ fseek(file.getFileHandle(), col_start, SEEK_SET);
+ size_t num_read = fread(input->data(),
+ 1,
+ input->size(),
+ file.getFileHandle());
+ if (num_read != input->size()) {
+ cerr << "Could not read column data." << endl;
+ continue;
+ }
+
+ readers[c] = new ColumnReader(&col.meta_data,
+ &metadata.schema[c+1],
+ input.release());
}
+ cout << "\n";
- int def_level = 0, rep_level = 0;
- while (reader.HasNext()) {
- switch (col.meta_data.type) {
- case Type::BOOLEAN: {
- bool val = reader.GetBool(&def_level, &rep_level);
- if (def_level < rep_level) break;
- ((bool*)(((bool**)column_ptr)[c]))[num_values] = val;
- break;
- }
- case Type::INT32: {
- int32_t val = reader.GetInt32(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- ((int32_t*)(((int32_t**)column_ptr)[c]))[num_values] = val;
- break;
- }
- case Type::INT64: {
- int64_t val = reader.GetInt64(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- ((int64_t *)(((int64_t**)column_ptr)[c]))[num_values] = val;
- break;
- }
- case Type::FLOAT: {
- float val = reader.GetFloat(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- ((float*)(((float**)column_ptr)[c]))[num_values] = val;
- break;
- }
- case Type::DOUBLE: {
- double val = reader.GetDouble(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- ((double*)(((double**)column_ptr)[c]))[num_values] = val;
- break;
- }
- case Type::BYTE_ARRAY: {
- ByteArray val = reader.GetByteArray(&def_level, &rep_level);;
- if (def_level < rep_level) break;
- ((ByteArray*)(((ByteArray**)column_ptr)[c]))[num_values] = val;
- break;
- }
+ vector<int> def_level(nColumns, 0);
+ vector<int> rep_level(nColumns, 0);
- default:
+ bool hasRow;
+ do {
+ hasRow = false;
+ for (int c = 0; c < nColumns; ++c) {
+ if (readers[c] == NULL) {
+ printf("%-" COL_WIDTH"s", " ");
continue;
+ }
+ const ColumnChunk& col = row_group.columns[c];
+ if (readers[c]->HasNext()) {
+ hasRow = true;
+ switch (col.meta_data.type) {
+ case Type::BOOLEAN: {
+ bool val = readers[c]->GetBool(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ printf("%-" COL_WIDTH"d",val);
+ }
+ break;
+ }
+ case Type::INT32: {
+ int32_t val = readers[c]->GetInt32(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ printf("%-" COL_WIDTH"d",val);
+ }
+ break;
+ }
+ case Type::INT64: {
+ int64_t val = readers[c]->GetInt64(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ printf("%-" COL_WIDTH"ld",val);
+ }
+ break;
+ }
+ case Type::FLOAT: {
+ float val = readers[c]->GetFloat(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ printf("%-" COL_WIDTH"f",val);
+ }
+ break;
+ }
+ case Type::DOUBLE: {
+ double val = readers[c]->GetDouble(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ printf("%-" COL_WIDTH"lf",val);
+ }
+ break;
+ }
+ case Type::BYTE_ARRAY: {
+ ByteArray val = readers[c]->GetByteArray(&def_level[c], &rep_level[c]);
+ if (def_level[c] >= rep_level[c]) {
+ string result = ByteArrayToString(val);
+ printf("%-" COL_WIDTH"s", result.c_str());
+ }
+ break;
+ }
+ default:
+ continue;
+ }
+ }
}
-
- if (def_level < rep_level) ++num_nulls;
- ++num_values;
- }
-
- total_row_number = num_values;
+ cout << "\n";
+ } while (hasRow);
+ } catch (exception& e) {
+ cout << "Caught an exception: " << e.what() << "\n";
+ } catch (...) {
+ cout << "Caught an exception.\n";
}
- // prints out the table
- cout << "=========================================================================\n";
-
- // j is the row, k is the column
- int k = 0, j = 0;
-
- // prints column name
- for (j = 0; j < row_group.columns.size(); ++j) {
- char *str = (char*)malloc(50);
- assert(str);
- strcpy(str, metadata.schema[j+1].name.c_str());
- printf("%-" COL_WIDTH"s", str);
- free(str);
+ for(vector<ColumnReader*>::iterator it = readers.begin(); it != readers.end(); it++) {
+ delete *it;
}
+ }
+}
- cout << "\n";
-
-
- for (j = 0;j < row_group.columns.size(); ++j)
- switch(type_array[j]) {
- case Type::BOOLEAN:
- printf("%-" COL_WIDTH"s","BOOLEAN");
- break;
- case Type::INT32:
- printf("%-" COL_WIDTH"s","INT32");
- break;
- case Type::INT64:
- printf("%-" COL_WIDTH"s","INT64");
- break;
- case Type::FLOAT:
- printf("%-" COL_WIDTH"s","FLOAT");
- break;
- case Type::DOUBLE:
- printf("%-" COL_WIDTH"s","DOUBLE");
- break;
- case Type::BYTE_ARRAY:
- printf("%-" COL_WIDTH"s","BYTE_ARRAY");
- break;
- default:
- continue;
- }
-
- cout << "\n";
+int main(int argc, char** argv) {
+ if (argc > 3) {
+ cerr << "Usage: parquet_reader [--only-stats] <file>" << endl;
+ return -1;
+ }
- static string result;
- char* str1;
+ string filename;
+ bool printContents = true;
- for (k = 0; k < total_row_number; ++k) {
- for (j = 0; j < row_group.columns.size(); ++j) {
- switch(type_array[j]) {
- case Type::BOOLEAN:
- printf("%-" COL_WIDTH"d",((bool*)(((bool**)column_ptr)[j]))[k]);
- break;
- case Type::INT32:
- printf("%-" COL_WIDTH"d",((int32_t *)(((int32_t **)column_ptr)[j]))[k]);
- break;
- case Type::INT64:
- printf("%-" COL_WIDTH"ld",((int64_t *)(((int64_t **)column_ptr)[j]))[k]);
- break;
- case Type::FLOAT:
- printf("%-" COL_WIDTH"f",((float*)(((float**)column_ptr)[j]))[k]);
- break;
- case Type::DOUBLE:
- printf("%-" COL_WIDTH"lf",((double*)(((double**)column_ptr)[j]))[k]);
- break;
- case Type::BYTE_ARRAY:
- result = ByteArrayToString( ((ByteArray*)(((ByteArray**)column_ptr)[j]))[k] );
- str1 = (char*)malloc(result.size());
- assert(str1);
- strcpy(str1, result.c_str());
- printf("%-" COL_WIDTH"s", str1);
- free(str1);
- break;
- default:
- continue;
- }
- }
- cout << "\n";
-
- // print ends
+ // Read command-line options
+ char *param, *value;
+ for (int i = 1; i < argc; i++) {
+ if ( (param = std::strstr(argv[i], "--only-stats")) ) {
+ printContents = false;
+ } else {
+ filename = argv[i];
}
-
- return column_ptr;
}
- fclose(file);
- return NULL;
+ readParquet(filename, printContents);
+
+ return 0;
}
+
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/src/parquet.cc
----------------------------------------------------------------------
diff --git a/src/parquet.cc b/src/parquet.cc
index 5a0f8f4..6b6adaa 100644
--- a/src/parquet.cc
+++ b/src/parquet.cc
@@ -36,8 +36,7 @@ using parquet::SchemaElement;
using parquet::Type;
InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
- buffer_(buffer), len_(len), offset_(0) {
-}
+ buffer_(buffer), len_(len), offset_(0) {}
const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
*num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
@@ -50,7 +49,32 @@ const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
return result;
}
+ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
+ buffer_.resize(len);
+ stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
+}
+
+uint8_t* ScopedInMemoryInputStream::data() {
+ return buffer_.data();
+}
+
+int64_t ScopedInMemoryInputStream::size() {
+ return buffer_.size();
+}
+
+const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
+ int* num_bytes) {
+ return stream_->Peek(num_to_peek, num_bytes);
+}
+
+const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
+ int* num_bytes) {
+ return stream_->Read(num_to_read, num_bytes);
+}
+
+
ColumnReader::~ColumnReader() {
+ delete stream_;
}
ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8fc24f86/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index a1af6b7..4469a82 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -88,6 +88,20 @@ class InMemoryInputStream : public InputStream {
int64_t offset_;
};
+// A wrapper for InMemoryInputStream to manage the memory.
+class ScopedInMemoryInputStream : public InputStream {
+ public:
+ ScopedInMemoryInputStream(int64_t len);
+ uint8_t* data();
+ int64_t size();
+ virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
+ virtual const uint8_t* Read(int num_to_read, int* num_bytes);
+
+ private:
+ std::vector<uint8_t> buffer_;
+ std::unique_ptr<InMemoryInputStream> stream_;
+};
+
// API to read values from a single column. This is the main client facing API.
class ColumnReader {
public: