You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/06/10 22:08:31 UTC
arrow git commit: ARROW-203: Python: Basic filename based Parquet
read/write
Repository: arrow
Updated Branches:
refs/heads/master 8197f246d -> ec66ddd1f
ARROW-203: Python: Basic filename based Parquet read/write
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #83 from xhochy/arrow-203 and squashes the following commits:
405f85d [Uwe L. Korn] Remove FindParquet duplication
38d786c [Uwe L. Korn] Make code more readable by using using
ec07768 [Uwe L. Korn] Set LD_LIBRARY_PATH in python build
8d90d3f [Uwe L. Korn] Do not set LD_LIBRARY_PATH in python build
000e1e3 [Uwe L. Korn] Use unique_ptr and shared_ptr from Cython
8f6010a [Uwe L. Korn] Linter fixes
0514d01 [Uwe L. Korn] Handle exceptions on RowGroupWriter::Close better
77bd21a [Uwe L. Korn] Add pandas roundtrip to tests
f583b61 [Uwe L. Korn] Fix rpath for libarrow_parquet
00c1461 [Uwe L. Korn] Also ensure correct OSX compiler flags in PyArrow
4a80116 [Uwe L. Korn] Handle Python3 strings correctly
066c08a [Uwe L. Korn] Add missing functions to smart pointers
5706db2 [Uwe L. Korn] Use length and offset instead of slicing
443de8b [Uwe L. Korn] Add miniconda to the LD_LIBRARY_PATH
2dffc14 [Uwe L. Korn] Fix min mistake, use equals instead of ==
2006e70 [Uwe L. Korn] Rewrite test py.test style
9520c39 [Uwe L. Korn] Use PARQUET from miniconda path
cd3b9a9 [Uwe L. Korn] Also search for Parquet in PyArrow
6a41d23 [Uwe L. Korn] Re-use conda installation from C++
81f501e [Uwe L. Korn] No need to install conda in travis_script_python anymore
b505feb [Uwe L. Korn] Install parquet-cpp via conda
5d4929a [Uwe L. Korn] Add test-util.h
9b06e41 [Uwe L. Korn] Make tests templated
be6415c [Uwe L. Korn] Incorportate review comments
0fbed3f [Uwe L. Korn] Remove obsolete parquet files
081db5f [Uwe L. Korn] Limit and document chunk_size
7192cfb [Uwe L. Korn] Add const to slicing parameters
0463995 [Uwe L. Korn] ARROW-203: Python: Basic filename based Parquet read/write
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ec66ddd1
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ec66ddd1
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ec66ddd1
Branch: refs/heads/master
Commit: ec66ddd1fd4954b78967bfa1893480473e4d380c
Parents: 8197f24
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Fri Jun 10 15:08:23 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Fri Jun 10 15:08:23 2016 -0700
----------------------------------------------------------------------
ci/travis_before_script_cpp.sh | 6 +-
ci/travis_conda_build.sh | 22 +--
ci/travis_install_conda.sh | 26 +++
ci/travis_script_python.sh | 21 +--
cpp/src/arrow/column.h | 2 +
cpp/src/arrow/parquet/CMakeLists.txt | 7 +
cpp/src/arrow/parquet/parquet-io-test.cc | 256 +++++++++++++++++++-------
cpp/src/arrow/parquet/reader.cc | 25 +++
cpp/src/arrow/parquet/reader.h | 3 +
cpp/src/arrow/parquet/test-util.h | 77 ++++++++
cpp/src/arrow/parquet/utils.h | 5 +
cpp/src/arrow/parquet/writer.cc | 99 +++++++---
cpp/src/arrow/parquet/writer.h | 12 +-
cpp/src/arrow/util/status.h | 9 +
python/CMakeLists.txt | 8 +
python/cmake_modules/FindArrow.cmake | 14 +-
python/conda.recipe/build.sh | 13 ++
python/pyarrow/array.pyx | 3 +
python/pyarrow/error.pxd | 2 +
python/pyarrow/error.pyx | 8 +
python/pyarrow/includes/common.pxd | 9 +-
python/pyarrow/includes/libarrow.pxd | 3 +
python/pyarrow/includes/parquet.pxd | 46 +++++
python/pyarrow/parquet.pyx | 50 ++++-
python/pyarrow/schema.pyx | 9 +-
python/pyarrow/tests/test_parquet.py | 59 ++++++
python/setup.py | 4 +-
27 files changed, 654 insertions(+), 144 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 193c76f..6159f67 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -2,6 +2,10 @@
set -e
+source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
+conda install -y --channel apache/channel/dev parquet-cpp
+export PARQUET_HOME=$MINICONDA
+
: ${CPP_BUILD_DIR=$TRAVIS_BUILD_DIR/cpp-build}
mkdir $CPP_BUILD_DIR
@@ -19,7 +23,7 @@ echo $GTEST_HOME
: ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}
-CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
+CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_conda_build.sh
----------------------------------------------------------------------
diff --git a/ci/travis_conda_build.sh b/ci/travis_conda_build.sh
index afa531d..c43a851 100755
--- a/ci/travis_conda_build.sh
+++ b/ci/travis_conda_build.sh
@@ -2,27 +2,7 @@
set -e
-if [ $TRAVIS_OS_NAME == "linux" ]; then
- MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
-else
- MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
-fi
-
-wget -O miniconda.sh $MINICONDA_URL
-MINICONDA=$TRAVIS_BUILD_DIR/miniconda
-bash miniconda.sh -b -p $MINICONDA
-export PATH="$MINICONDA/bin:$PATH"
-conda update -y -q conda
-conda info -a
-
-conda config --set show_channel_urls yes
-conda config --add channels conda-forge
-conda config --add channels apache
-
-conda install --yes conda-build jinja2 anaconda-client
-
-# faster builds, please
-conda install -y nomkl
+source $TRAVIS_BUILD_DIR/ci/travis_install_conda.sh
# Build libarrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_install_conda.sh
----------------------------------------------------------------------
diff --git a/ci/travis_install_conda.sh b/ci/travis_install_conda.sh
new file mode 100644
index 0000000..bef667d
--- /dev/null
+++ b/ci/travis_install_conda.sh
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+set -e
+
+if [ $TRAVIS_OS_NAME == "linux" ]; then
+ MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
+else
+ MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
+fi
+
+wget -O miniconda.sh $MINICONDA_URL
+export MINICONDA=$TRAVIS_BUILD_DIR/miniconda
+bash miniconda.sh -b -p $MINICONDA
+export PATH="$MINICONDA/bin:$PATH"
+conda update -y -q conda
+conda info -a
+
+conda config --set show_channel_urls yes
+conda config --add channels conda-forge
+conda config --add channels apache
+
+conda install --yes conda-build jinja2 anaconda-client
+
+# faster builds, please
+conda install -y nomkl
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index d45b895..6d35785 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -4,6 +4,12 @@ set -e
PYTHON_DIR=$TRAVIS_BUILD_DIR/python
+# Re-use conda installation from C++
+export MINICONDA=$TRAVIS_BUILD_DIR/miniconda
+export PATH="$MINICONDA/bin:$PATH"
+export LD_LIBRARY_PATH="$MINICONDA/lib:$LD_LIBRARY_PATH"
+export PARQUET_HOME=$MINICONDA
+
# Share environment with C++
pushd $CPP_BUILD_DIR
source setup_build_env.sh
@@ -11,21 +17,6 @@ popd
pushd $PYTHON_DIR
-# Bootstrap a Conda Python environment
-
-if [ $TRAVIS_OS_NAME == "linux" ]; then
- MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh"
-else
- MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda-latest-MacOSX-x86_64.sh"
-fi
-
-curl $MINICONDA_URL > miniconda.sh
-MINICONDA=$TRAVIS_BUILD_DIR/miniconda
-bash miniconda.sh -b -p $MINICONDA
-export PATH="$MINICONDA/bin:$PATH"
-conda update -y -q conda
-conda info -a
-
python_version_tests() {
PYTHON_VERSION=$1
CONDA_ENV_NAME="pyarrow-test-${PYTHON_VERSION}"
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index 22becc3..e409566 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -67,6 +67,8 @@ class Column {
int64_t null_count() const { return data_->null_count(); }
+ const std::shared_ptr<Field>& field() const { return field_; }
+
// @returns: the column's name in the passed metadata
const std::string& name() const { return field_->name; }
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt
index c00cc9f..f00bb53 100644
--- a/cpp/src/arrow/parquet/CMakeLists.txt
+++ b/cpp/src/arrow/parquet/CMakeLists.txt
@@ -35,6 +35,13 @@ add_library(arrow_parquet SHARED
target_link_libraries(arrow_parquet ${PARQUET_LIBS})
SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
+if (APPLE)
+ set_target_properties(arrow_parquet
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+endif()
+
ADD_ARROW_TEST(parquet-schema-test)
ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index 845574d..db779d8 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -18,6 +18,7 @@
#include "gtest/gtest.h"
#include "arrow/test-util.h"
+#include "arrow/parquet/test-util.h"
#include "arrow/parquet/reader.h"
#include "arrow/parquet/writer.h"
#include "arrow/types/primitive.h"
@@ -44,36 +45,45 @@ namespace arrow {
namespace parquet {
-template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NonNullArray(
- size_t size, typename ArrowType::c_type value) {
- std::vector<typename ArrowType::c_type> values(size, value);
- NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
- builder.Append(values.data(), values.size());
- return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
-}
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
-// This helper function only supports (size/2) nulls yet.
-template <typename ArrowType>
-std::shared_ptr<PrimitiveArray> NullableArray(
- size_t size, typename ArrowType::c_type value, size_t num_nulls) {
- std::vector<typename ArrowType::c_type> values(size, value);
- std::vector<uint8_t> valid_bytes(size, 1);
+template <typename TestType>
+struct test_traits {};
- for (size_t i = 0; i < num_nulls; i++) {
- valid_bytes[i * 2] = 0;
- }
+template <>
+struct test_traits<Int32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+};
- NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
- builder.Append(values.data(), values.size(), valid_bytes.data());
- return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
-}
+template <>
+struct test_traits<Int64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+};
+
+template <>
+struct test_traits<FloatType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+};
+
+template <>
+struct test_traits<DoubleType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+};
+
+template <typename T>
+using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>;
+template <typename T>
+using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
+ typedef typename TestType::c_type T;
virtual void SetUp() {}
- std::shared_ptr<GroupNode> Schema(
+ std::shared_ptr<GroupNode> MakeSchema(
ParquetType::type parquet_type, Repetition::type repetition) {
auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
NodePtr node_ =
@@ -98,20 +108,27 @@ class TestParquetIO : public ::testing::Test {
std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
ASSERT_NE(nullptr, column_reader.get());
- ASSERT_OK(column_reader->NextBatch(100, out));
+ ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ void ReadTableFromFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+ arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+ ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out)));
ASSERT_NE(nullptr, out->get());
}
- std::unique_ptr<ParquetFileReader> Int64File(
- std::vector<int64_t>& values, int num_chunks) {
- std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+ std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) {
+ std::shared_ptr<GroupNode> schema =
+ MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED);
std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
size_t chunk_size = values.size() / num_chunks;
for (int i = 0; i < num_chunks; i++) {
auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
- auto column_writer =
- static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn());
- int64_t* data = values.data() + i * chunk_size;
+ auto column_writer = static_cast<ParquetWriter<TestType>*>(
+ row_group_writer->NextColumn());
+ T* data = values.data() + i * chunk_size;
column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
column_writer->Close();
row_group_writer->Close();
@@ -120,71 +137,135 @@ class TestParquetIO : public ::testing::Test {
return ReaderFromSink();
}
- private:
std::shared_ptr<InMemoryOutputStream> sink_;
};
-TEST_F(TestParquetIO, SingleColumnInt64Read) {
- std::vector<int64_t> values(100, 128);
- std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
+typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) {
+ std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
std::shared_ptr<Array> out;
- ReadSingleColumnFile(std::move(file_reader), &out);
+ this->ReadSingleColumnFile(std::move(file_reader), &out);
- Int64Array* out_array = static_cast<Int64Array*>(out.get());
- for (size_t i = 0; i < values.size(); i++) {
- EXPECT_EQ(values[i], out_array->raw_data()[i]);
- }
+ ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
}
-TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) {
- std::vector<int64_t> values(100, 128);
- std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) {
+ std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1);
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(std::move(file_reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) {
+ std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
std::shared_ptr<Array> out;
- ReadSingleColumnFile(std::move(file_reader), &out);
+ this->ReadSingleColumnFile(std::move(file_reader), &out);
- Int64Array* out_array = static_cast<Int64Array*>(out.get());
- for (size_t i = 0; i < values.size(); i++) {
- EXPECT_EQ(values[i], out_array->raw_data()[i]);
- }
+ ExpectArray<typename TypeParam::c_type>(values.data(), out.get());
}
-TEST_F(TestParquetIO, SingleColumnInt64Write) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) {
+ std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128);
+ std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4);
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(std::move(file_reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
- std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
- FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+
+ std::shared_ptr<GroupNode> schema =
+ this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
std::shared_ptr<Array> out;
- ReadSingleColumnFile(ReaderFromSink(), &out);
+ this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
ASSERT_TRUE(values->Equals(out));
}
-TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) {
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_NO_THROW(ASSERT_OK(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(100, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
// This also tests max_definition_level = 1
- std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+ std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
- std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
- FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ std::shared_ptr<GroupNode> schema =
+ this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
std::shared_ptr<Array> out;
- ReadSingleColumnFile(ReaderFromSink(), &out);
+ this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
ASSERT_TRUE(values->Equals(out));
}
-TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
- std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
- std::shared_ptr<PrimitiveArray> values_chunk = NonNullArray<Int64Type>(25, 128);
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_NO_THROW(ASSERT_OK(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length())));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
- std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
- FileWriter writer(default_memory_pool(), MakeWriter(schema));
+TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128);
+ std::shared_ptr<PrimitiveArray> values_chunk =
+ NonNullArray<TypeParam>(SMALL_SIZE / 4, 128);
+
+ std::shared_ptr<GroupNode> schema =
+ this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
for (int i = 0; i < 4; i++) {
ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
@@ -192,18 +273,37 @@ TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
std::shared_ptr<Array> out;
- ReadSingleColumnFile(ReaderFromSink(), &out);
+ this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
ASSERT_TRUE(values->Equals(out));
}
-TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
- std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128, 10);
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+ std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_NO_THROW(
+ ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(LARGE_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+ std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10);
std::shared_ptr<PrimitiveArray> values_chunk_nulls =
- NullableArray<DoubleType>(25, 128, 10);
- std::shared_ptr<PrimitiveArray> values_chunk = NullableArray<DoubleType>(25, 128, 0);
+ NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10);
+ std::shared_ptr<PrimitiveArray> values_chunk =
+ NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0);
- std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
- FileWriter writer(default_memory_pool(), MakeWriter(schema));
+ std::shared_ptr<GroupNode> schema =
+ this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
for (int i = 0; i < 3; i++) {
@@ -213,10 +313,28 @@ TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
std::shared_ptr<Array> out;
- ReadSingleColumnFile(ReaderFromSink(), &out);
+ this->ReadSingleColumnFile(this->ReaderFromSink(), &out);
ASSERT_TRUE(values->Equals(out));
}
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_NO_THROW(
+ ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512)));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(LARGE_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
} // namespace parquet
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 346de25..3b4882d 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -18,10 +18,14 @@
#include "arrow/parquet/reader.h"
#include <queue>
+#include <string>
+#include <vector>
+#include "arrow/column.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/schema.h"
+#include "arrow/table.h"
#include "arrow/types/primitive.h"
#include "arrow/util/status.h"
@@ -40,6 +44,7 @@ class FileReader::Impl {
bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr);
Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+ Status ReadFlatTable(std::shared_ptr<Table>* out);
private:
MemoryPool* pool_;
@@ -103,6 +108,22 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
return flat_column_reader->NextBatch(reader_->num_rows(), out);
}
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+ const std::string& name = reader_->descr()->schema()->name();
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(FromParquetSchema(reader_->descr(), &schema));
+
+ std::vector<std::shared_ptr<Column>> columns(reader_->num_columns());
+ for (int i = 0; i < reader_->num_columns(); i++) {
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(ReadFlatColumn(i, &array));
+ columns[i] = std::make_shared<Column>(schema->field(i), array);
+ }
+
+ *table = std::make_shared<Table>(name, schema, columns);
+ return Status::OK();
+}
+
FileReader::FileReader(
MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
: impl_(new FileReader::Impl(pool, std::move(reader))) {}
@@ -117,6 +138,10 @@ Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
return impl_->ReadFlatColumn(i, out);
}
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+ return impl_->ReadFlatTable(out);
+}
+
FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor* descr,
::parquet::ParquetFileReader* reader, int column_index)
: pool_(pool),
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h
index 41ca7eb..db7a157 100644
--- a/cpp/src/arrow/parquet/reader.h
+++ b/cpp/src/arrow/parquet/reader.h
@@ -29,6 +29,7 @@ class Array;
class MemoryPool;
class RowBatch;
class Status;
+class Table;
namespace parquet {
@@ -90,6 +91,8 @@ class FileReader {
Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
// Read column as a whole into an Array.
Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+ // Read a table of flat columns into a Table.
+ Status ReadFlatTable(std::shared_ptr<Table>* out);
virtual ~FileReader();
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h
new file mode 100644
index 0000000..1496082
--- /dev/null
+++ b/cpp/src/arrow/parquet/test-util.h
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/types/primitive.h"
+
+namespace arrow {
+
+namespace parquet {
+
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NonNullArray(
+ size_t size, typename ArrowType::c_type value) {
+ std::vector<typename ArrowType::c_type> values(size, value);
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NullableArray(
+ size_t size, typename ArrowType::c_type value, size_t num_nulls) {
+ std::vector<typename ArrowType::c_type> values(size, value);
+ std::vector<uint8_t> valid_bytes(size, 1);
+
+ for (size_t i = 0; i < num_nulls; i++) {
+ valid_bytes[i * 2] = 0;
+ }
+
+ NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+ builder.Append(values.data(), values.size(), valid_bytes.data());
+ return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+std::shared_ptr<Column> MakeColumn(const std::string& name,
+ const std::shared_ptr<PrimitiveArray>& array, bool nullable) {
+ auto field = std::make_shared<Field>(name, array->type(), nullable);
+ return std::make_shared<Column>(field, array);
+}
+
+std::shared_ptr<Table> MakeSimpleTable(
+ const std::shared_ptr<PrimitiveArray>& values, bool nullable) {
+ std::shared_ptr<Column> column = MakeColumn("col", values, nullable);
+ std::vector<std::shared_ptr<Column>> columns({column});
+ std::vector<std::shared_ptr<Field>> fields({column->field()});
+ auto schema = std::make_shared<Schema>(fields);
+ return std::make_shared<Table>("table", schema, columns);
+}
+
+template <typename T>
+void ExpectArray(T* expected, Array* result) {
+ PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result);
+ for (size_t i = 0; i < result->length(); i++) {
+ EXPECT_EQ(expected[i], reinterpret_cast<const T*>(p_array->data()->data())[i]);
+ }
+}
+
+} // namespace parquet
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/utils.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/utils.h b/cpp/src/arrow/parquet/utils.h
index b32792f..409bcd9 100644
--- a/cpp/src/arrow/parquet/utils.h
+++ b/cpp/src/arrow/parquet/utils.h
@@ -31,6 +31,11 @@ namespace parquet {
(s); \
} catch (const ::parquet::ParquetException& e) { return Status::Invalid(e.what()); }
+#define PARQUET_IGNORE_NOT_OK(s) \
+ try { \
+ (s); \
+ } catch (const ::parquet::ParquetException& e) {}
+
} // namespace parquet
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
index 3ad2c5b..1223901 100644
--- a/cpp/src/arrow/parquet/writer.cc
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -17,11 +17,21 @@
#include "arrow/parquet/writer.h"
+#include <algorithm>
+#include <vector>
+
#include "arrow/array.h"
+#include "arrow/column.h"
+#include "arrow/table.h"
+#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
+#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/util/status.h"
+using parquet::ParquetFileWriter;
+using parquet::schema::GroupNode;
+
namespace arrow {
namespace parquet {
@@ -32,8 +42,9 @@ class FileWriter::Impl {
Status NewRowGroup(int64_t chunk_size);
template <typename ParquetType>
- Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data);
- Status WriteFlatColumnChunk(const PrimitiveArray* data);
+ Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data,
+ int64_t offset, int64_t length);
+ Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
Status Close();
virtual ~Impl() {}
@@ -60,31 +71,31 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
}
template <typename ParquetType>
-Status FileWriter::Impl::TypedWriteBatch(
- ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) {
+Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer,
+ const PrimitiveArray* data, int64_t offset, int64_t length) {
+ // TODO: DCHECK((offset + length) <= data->length());
auto data_ptr =
- reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data());
+ reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) +
+ offset;
auto writer =
reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
if (writer->descr()->max_definition_level() == 0) {
// no nulls, just dump the data
- PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr));
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr));
} else if (writer->descr()->max_definition_level() == 1) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t)));
+ RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
int16_t* def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
if (data->null_count() == 0) {
- std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1);
- PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr));
+ std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+ PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr));
} else {
- RETURN_NOT_OK(data_buffer_.Resize(
- (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type)));
+ RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type)));
auto buffer_ptr =
reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
int buffer_idx = 0;
- for (size_t i = 0; i < data->length(); i++) {
- if (data->IsNull(i)) {
+ for (size_t i = 0; i < length; i++) {
+ if (data->IsNull(offset + i)) {
def_levels_ptr[i] = 0;
} else {
def_levels_ptr[i] = 1;
@@ -92,7 +103,7 @@ Status FileWriter::Impl::TypedWriteBatch(
}
}
PARQUET_CATCH_NOT_OK(
- writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr));
+ writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
}
} else {
return Status::NotImplemented("no support for max definition level > 1 yet");
@@ -107,12 +118,13 @@ Status FileWriter::Impl::Close() {
return Status::OK();
}
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
- case Type::ENUM: \
- return TypedWriteBatch<ParquetType>(writer, data); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case Type::ENUM: \
+ return TypedWriteBatch<ParquetType>(writer, data, offset, length); \
break;
-Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) {
+Status FileWriter::Impl::WriteFlatColumnChunk(
+ const PrimitiveArray* data, int64_t offset, int64_t length) {
::parquet::ColumnWriter* writer;
PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
switch (data->type_enum()) {
@@ -133,8 +145,11 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
return impl_->NewRowGroup(chunk_size);
}
-Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) {
- return impl_->WriteFlatColumnChunk(data);
+Status FileWriter::WriteFlatColumnChunk(
+ const PrimitiveArray* data, int64_t offset, int64_t length) {
+ int64_t real_length = length;
+ if (length == -1) { real_length = data->length(); }
+ return impl_->WriteFlatColumnChunk(data, offset, real_length);
}
Status FileWriter::Close() {
@@ -143,6 +158,48 @@ Status FileWriter::Close() {
FileWriter::~FileWriter() {}
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+ std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) {
+ std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
+ RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema));
+ auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
+ std::unique_ptr<ParquetFileWriter> parquet_writer =
+ ParquetFileWriter::Open(sink, schema_node);
+ FileWriter writer(pool, std::move(parquet_writer));
+
+ // TODO: Support writing chunked arrays.
+ for (int i = 0; i < table->num_columns(); i++) {
+ if (table->column(i)->data()->num_chunks() != 1) {
+ return Status::NotImplemented("No support for writing chunked arrays yet.");
+ }
+ }
+
+ // Cast to PrimitiveArray instances as we work with them.
+ std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns());
+ for (int i = 0; i < table->num_columns(); i++) {
+ // num_chunks == 1 as per above loop
+ std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
+ auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array);
+ if (!primitive_array) {
+ PARQUET_IGNORE_NOT_OK(writer.Close());
+ return Status::NotImplemented("Table must consist of PrimitiveArray instances");
+ }
+ arrays[i] = primitive_array;
+ }
+
+ for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
+ int64_t offset = chunk * chunk_size;
+ int64_t size = std::min(chunk_size, table->num_rows() - offset);
+ RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+ for (int i = 0; i < table->num_columns(); i++) {
+ RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size),
+ PARQUET_IGNORE_NOT_OK(writer.Close()));
+ }
+ }
+
+ return writer.Close();
+}
+
} // namespace parquet
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 38f7d0b..83e799f 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -29,6 +29,7 @@ class MemoryPool;
class PrimitiveArray;
class RowBatch;
class Status;
+class Table;
namespace parquet {
@@ -42,7 +43,8 @@ class FileWriter {
FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
Status NewRowGroup(int64_t chunk_size);
- Status WriteFlatColumnChunk(const PrimitiveArray* data);
+ Status WriteFlatColumnChunk(
+ const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1);
Status Close();
virtual ~FileWriter();
@@ -52,6 +54,14 @@ class FileWriter {
std::unique_ptr<Impl> impl_;
};
+/**
+ * Write a flat Table to Parquet.
+ *
+ * The table shall only consist of nullable, non-repeated columns of primitive type.
+ */
+Status WriteFlatTable(const Table* table, MemoryPool* pool,
+ std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size);
+
} // namespace parquet
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/cpp/src/arrow/util/status.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/status.h b/cpp/src/arrow/util/status.h
index 6ddc177..d1a7425 100644
--- a/cpp/src/arrow/util/status.h
+++ b/cpp/src/arrow/util/status.h
@@ -63,6 +63,15 @@ namespace arrow {
if (!_s.ok()) { return _s; } \
} while (0);
+#define RETURN_NOT_OK_ELSE(s, else_) \
+ do { \
+ Status _s = (s); \
+ if (!_s.ok()) { \
+ else_; \
+ return _s; \
+ } \
+ } while (0);
+
enum class StatusCode : char {
OK = 0,
OutOfMemory = 1,
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 2173232..f1becfc 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -339,11 +339,17 @@ if (PYARROW_BUILD_TESTS)
STATIC_LIB ${GTEST_STATIC_LIB})
endif()
+## Parquet
+find_package(Parquet REQUIRED)
+include_directories(SYSTEM ${PARQUET_INCLUDE_DIR})
+
## Arrow
find_package(Arrow REQUIRED)
include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
ADD_THIRDPARTY_LIB(arrow
SHARED_LIB ${ARROW_SHARED_LIB})
+ADD_THIRDPARTY_LIB(arrow_parquet
+ SHARED_LIB ${ARROW_PARQUET_SHARED_LIB})
############################################################
# Linker setup
@@ -422,6 +428,7 @@ set(PYARROW_SRCS
set(LINK_LIBS
arrow
+ arrow_parquet
)
SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
@@ -442,6 +449,7 @@ set(CYTHON_EXTENSIONS
array
config
error
+ parquet
scalar
schema
table
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake
index 3d99838..f0b258e 100644
--- a/python/cmake_modules/FindArrow.cmake
+++ b/python/cmake_modules/FindArrow.cmake
@@ -42,19 +42,27 @@ find_library(ARROW_LIB_PATH NAMES arrow
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
-if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+find_library(ARROW_PARQUET_LIB_PATH NAMES arrow_parquet
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH AND ARROW_PARQUET_LIB_PATH)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME libarrow)
+ set(ARROW_PARQUET_LIB_NAME libarrow_parquet)
set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ set(ARROW_PARQUET_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_PARQUET_LIB_NAME}.a)
+ set(ARROW_PARQUET_SHARED_LIB ${ARROW_LIBS}/${ARROW_PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
else ()
set(ARROW_FOUND FALSE)
endif ()
if (ARROW_FOUND)
if (NOT Arrow_FIND_QUIETLY)
- message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}")
+ message(STATUS "Found the Arrow library: ${ARROW_LIB_PATH}, ${ARROW_PARQUET_LIB_PATH}")
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
@@ -74,4 +82,6 @@ mark_as_advanced(
ARROW_LIBS
ARROW_STATIC_LIB
ARROW_SHARED_LIB
+ ARROW_PARQUET_STATIC_LIB
+ ARROW_PARQUET_SHARED_LIB
)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/python/conda.recipe/build.sh b/python/conda.recipe/build.sh
index a9d9aed..a164c1a 100644
--- a/python/conda.recipe/build.sh
+++ b/python/conda.recipe/build.sh
@@ -6,6 +6,19 @@ export ARROW_HOME=$PREFIX
cd $RECIPE_DIR
+if [ "$(uname)" == "Darwin" ]; then
+ # C++11 finagling for Mac OSX
+ export CC=clang
+ export CXX=clang++
+ export MACOSX_VERSION_MIN="10.7"
+ CXXFLAGS="${CXXFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}"
+ CXXFLAGS="${CXXFLAGS} -stdlib=libc++ -std=c++11"
+ export LDFLAGS="${LDFLAGS} -mmacosx-version-min=${MACOSX_VERSION_MIN}"
+ export LDFLAGS="${LDFLAGS} -stdlib=libc++ -std=c++11"
+ export LINKFLAGS="${LDFLAGS}"
+ export MACOSX_DEPLOYMENT_TARGET=10.7
+fi
+
echo Setting the compiler...
if [ `uname` == Linux ]; then
EXTRA_CMAKE_ARGS=-DCMAKE_SHARED_LINKER_FLAGS=-static-libstdc++
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index a80b3ce..619e5ef 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -68,6 +68,9 @@ cdef class Array:
values = array_format(self, window=10)
return '{0}\n{1}'.format(type_format, values)
+ def equals(Array self, Array other):
+ return self.ap.Equals(other.sp_array)
+
def __len__(self):
if self.sp_array.get():
return self.sp_array.get().length()
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd
index d226abe..97ba0ef 100644
--- a/python/pyarrow/error.pxd
+++ b/python/pyarrow/error.pxd
@@ -15,6 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+from pyarrow.includes.libarrow cimport CStatus
from pyarrow.includes.pyarrow cimport *
+cdef check_cstatus(const CStatus& status)
cdef check_status(const Status& status)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/error.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx
index 3f8d7dd..5a6a038 100644
--- a/python/pyarrow/error.pyx
+++ b/python/pyarrow/error.pyx
@@ -15,12 +15,20 @@
# specific language governing permissions and limitations
# under the License.
+from pyarrow.includes.libarrow cimport CStatus
from pyarrow.includes.common cimport c_string
from pyarrow.compat import frombytes
class ArrowException(Exception):
pass
+cdef check_cstatus(const CStatus& status):
+ if status.ok():
+ return
+
+ cdef c_string c_message = status.ToString()
+ raise ArrowException(frombytes(c_message))
+
cdef check_status(const Status& status):
if status.ok():
return
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index e86d5d7..1f6ecee 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -19,6 +19,7 @@
from libc.stdint cimport *
from libcpp cimport bool as c_bool
+from libcpp.memory cimport shared_ptr, unique_ptr
from libcpp.string cimport string as c_string
from libcpp.vector cimport vector
@@ -32,11 +33,3 @@ cdef extern from "<iostream>":
cdef extern from "<Python.h>":
void Py_XDECREF(PyObject* o)
-cdef extern from "<memory>" namespace "std" nogil:
-
- cdef cppclass shared_ptr[T]:
- shared_ptr()
- shared_ptr(T*)
- T* get()
- void reset()
- void reset(T* p)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index b2ef45a..90414e3 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -72,6 +72,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass MemoryPool" arrow::MemoryPool":
int64_t bytes_allocated()
+ cdef MemoryPool* default_memory_pool()
+
cdef cppclass CListType" arrow::ListType"(CDataType):
CListType(const shared_ptr[CDataType]& value_type)
@@ -103,6 +105,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
int32_t null_count()
Type type_enum()
+ c_bool Equals(const shared_ptr[CArray]& arr)
c_bool IsNull(int i)
cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray):
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index ffdc5d4..0918344 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -18,6 +18,26 @@
# distutils: language = c++
from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
+
+
+cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
+ cdef cppclass Node:
+ pass
+
+ cdef cppclass GroupNode(Node):
+ pass
+
+ cdef cppclass PrimitiveNode(Node):
+ pass
+
+cdef extern from "parquet/api/schema.h" namespace "parquet" nogil:
+ cdef cppclass SchemaDescriptor:
+ shared_ptr[Node] schema()
+ GroupNode* group()
+
+ cdef cppclass ColumnDescriptor:
+ pass
cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
cdef cppclass ColumnReader:
@@ -48,4 +68,30 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
pass
cdef cppclass ParquetFileReader:
+ # TODO: Some default arguments are missing
+ @staticmethod
+ unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
+
+cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
+ cdef cppclass OutputStream:
pass
+
+ cdef cppclass LocalFileOutputStream(OutputStream):
+ LocalFileOutputStream(const c_string& path)
+ void Close()
+
+
+cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
+ cdef cppclass FileReader:
+ FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
+ CStatus ReadFlatTable(shared_ptr[CTable]* out);
+
+
+cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
+ CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out)
+ CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out)
+
+
+cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
+ cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size)
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 622e7d0..3d5355e 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -19,5 +19,53 @@
# distutils: language = c++
# cython: embedsignature = True
-from pyarrow.compat import frombytes, tobytes
+from pyarrow.includes.libarrow cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.includes.parquet cimport *
+
+from pyarrow.compat import tobytes
+from pyarrow.error cimport check_cstatus
+from pyarrow.table cimport Table
+
+def read_table(filename, columns=None):
+ """
+ Read a Table from Parquet format
+ Returns
+ -------
+ table: pyarrow.Table
+ """
+ cdef unique_ptr[FileReader] reader
+ cdef Table table = Table()
+ cdef shared_ptr[CTable] ctable
+
+ # Must be in one expression to avoid calling std::move which is not possible
+ # in Cython (due to missing rvalue support)
+ reader = unique_ptr[FileReader](new FileReader(default_memory_pool(),
+ ParquetFileReader.OpenFile(tobytes(filename))))
+ check_cstatus(reader.get().ReadFlatTable(&ctable))
+ table.init(ctable)
+ return table
+
+def write_table(table, filename, chunk_size=None):
+ """
+ Write a Table to Parquet format
+
+ Parameters
+ ----------
+ table : pyarrow.Table
+ filename : string
+ chunk_size : int
+ The maximum number of rows in each Parquet RowGroup
+ """
+ cdef Table table_ = table
+ cdef CTable* ctable_ = table_.table
+ cdef shared_ptr[OutputStream] sink
+ cdef int64_t chunk_size_ = 0
+ if chunk_size is None:
+ chunk_size_ = min(ctable_.num_rows(), int(2**16))
+ else:
+ chunk_size_ = chunk_size
+
+ sink.reset(new LocalFileOutputStream(tobytes(filename)))
+ check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_))
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
index 22ddf0c..084c304 100644
--- a/python/pyarrow/schema.pyx
+++ b/python/pyarrow/schema.pyx
@@ -201,7 +201,9 @@ def string():
def list_(DataType value_type):
cdef DataType out = DataType()
- out.init(shared_ptr[CDataType](new CListType(value_type.sp_type)))
+ cdef shared_ptr[CDataType] list_type
+ list_type.reset(new CListType(value_type.sp_type))
+ out.init(list_type)
return out
def struct(fields):
@@ -212,12 +214,13 @@ def struct(fields):
DataType out = DataType()
Field field
vector[shared_ptr[CField]] c_fields
+ cdef shared_ptr[CDataType] struct_type
for field in fields:
c_fields.push_back(field.sp_field)
- out.init(shared_ptr[CDataType](
- new CStructType(c_fields)))
+ struct_type.reset(new CStructType(c_fields))
+ out.init(struct_type)
return out
def schema(fields):
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
new file mode 100644
index 0000000..d92cf4c
--- /dev/null
+++ b/python/pyarrow/tests/test_parquet.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyarrow.compat import unittest
+import pyarrow as arrow
+import pyarrow.parquet
+
+A = arrow
+
+import numpy as np
+import os.path
+import pandas as pd
+
+import pandas.util.testing as pdt
+
+
+def test_single_pylist_column_roundtrip(tmpdir):
+ for dtype in [int, float]:
+ filename = tmpdir.join('single_{}_column.parquet'.format(dtype.__name__))
+ data = [A.from_pylist(list(map(dtype, range(5))))]
+ table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+ A.parquet.write_table(table, filename.strpath)
+ table_read = pyarrow.parquet.read_table(filename.strpath)
+ for col_written, col_read in zip(table.itercolumns(), table_read.itercolumns()):
+ assert col_written.name == col_read.name
+ assert col_read.data.num_chunks == 1
+ data_written = col_written.data.chunk(0)
+ data_read = col_read.data.chunk(0)
+ assert data_written.equals(data_read)
+
+def test_pandas_rountrip(tmpdir):
+ size = 10000
+ df = pd.DataFrame({
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64)
+ })
+ filename = tmpdir.join('pandas_rountrip.parquet')
+ arrow_table = A.from_pandas_dataframe(df)
+ A.parquet.write_table(arrow_table, filename.strpath)
+ table_read = pyarrow.parquet.read_table(filename.strpath)
+ df_read = table_read.to_pandas()
+ pdt.assert_frame_equal(df, df_read)
+
http://git-wip-us.apache.org/repos/asf/arrow/blob/ec66ddd1/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 5f228ed..7edeb91 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -214,7 +214,7 @@ class build_ext(_build_ext):
return name + suffix
def get_cmake_cython_names(self):
- return ['array', 'config', 'error', 'scalar', 'schema', 'table']
+ return ['array', 'config', 'error', 'parquet', 'scalar', 'schema', 'table']
def get_names(self):
return self._found_names
@@ -242,7 +242,7 @@ setup(
'clean': clean,
'build_ext': build_ext
},
- install_requires=['cython >= 0.21', 'numpy >= 1.9'],
+ install_requires=['cython >= 0.23', 'numpy >= 1.9'],
description=DESC,
license='Apache License, Version 2.0',
maintainer="Apache Arrow Developers",