You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/18 18:01:50 UTC
[2/2] parquet-cpp git commit: PARQUET-712: Add library to read into
Arrow memory
PARQUET-712: Add library to read into Arrow memory
At the moment this is just move of the existing code into the state where it compiles. Outstanding work includes:
- [x] Understand the issues with ParquetException typeid matching in the tests *on macOS*. @wesm We already had this problem and you fixed it somewhere. Do you remember the solution?
- [x] Understand why BoolenType tests break with a Thrift exception
- [ ] Add functions that read directly into Arrow memory and not intermediate structures.
Author: Uwe L. Korn <uw...@xhochy.com>
Author: Korn, Uwe <Uw...@blue-yonder.com>
Closes #158 from xhochy/PARQUET-712 and squashes the following commits:
e55ab1f [Uwe L. Korn] verbose ctest output
62f0f88 [Uwe L. Korn] Add static linkage
fc2c316 [Uwe L. Korn] Style fixes
3f3e24b [Uwe L. Korn] Fix templating problem
45de044 [Uwe L. Korn] Style fixes for IO
1d39a60 [Uwe L. Korn] Import MemoryPool instead of declaring it
251262a [Uwe L. Korn] Style fixes
e0e1518 [Uwe L. Korn] Build parquet_arrow in Travis
874b33d [Korn, Uwe] Add boost libraries for Arrow
142a364 [Korn, Uwe] PARQUET-712: Add library to read into Arrow memory
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4a7bf117
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4a7bf117
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4a7bf117
Branch: refs/heads/master
Commit: 4a7bf1174419db6b2e4fc1847d866c4763b7be44
Parents: 8ef68b1
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Sun Sep 18 14:01:41 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Sun Sep 18 14:01:41 2016 -0400
----------------------------------------------------------------------
.travis.yml | 6 +-
CMakeLists.txt | 35 ++
ci/travis_script_cpp.sh | 4 +-
cmake_modules/FindArrow.cmake | 87 ++++
conda.recipe/build.sh | 1 +
src/parquet/arrow/CMakeLists.txt | 90 ++++
src/parquet/arrow/arrow-io-test.cc | 189 ++++++++
src/parquet/arrow/arrow-reader-writer-test.cc | 502 +++++++++++++++++++++
src/parquet/arrow/arrow-schema-test.cc | 265 +++++++++++
src/parquet/arrow/io.cc | 107 +++++
src/parquet/arrow/io.h | 82 ++++
src/parquet/arrow/reader.cc | 406 +++++++++++++++++
src/parquet/arrow/reader.h | 148 ++++++
src/parquet/arrow/schema.cc | 351 ++++++++++++++
src/parquet/arrow/schema.h | 56 +++
src/parquet/arrow/test-util.h | 202 +++++++++
src/parquet/arrow/utils.h | 54 +++
src/parquet/arrow/writer.cc | 373 +++++++++++++++
src/parquet/arrow/writer.h | 78 ++++
thirdparty/build_thirdparty.sh | 10 +
thirdparty/download_thirdparty.sh | 5 +
thirdparty/set_thirdparty_env.sh | 1 +
thirdparty/versions.sh | 4 +
23 files changed, 3052 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 780d9f9..6dc994e 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -9,6 +9,8 @@ addons:
- g++-4.9
- valgrind
- libboost-dev
+ - libboost-filesystem-dev
+ - libboost-system-dev
- libboost-program-options-dev
- libboost-test-dev
- libssl-dev
@@ -26,7 +28,7 @@ matrix:
before_script:
- source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_TEST_MEMCHECK=ON -DPARQUET_BUILD_BENCHMARKS=ON
- -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
+ -DPARQUET_ARROW=ON -DPARQUET_GENERATE_COVERAGE=1 $TRAVIS_BUILD_DIR
- export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
- compiler: clang
os: linux
@@ -76,7 +78,7 @@ before_install:
before_script:
- source $TRAVIS_BUILD_DIR/ci/before_script_travis.sh
-- cmake -DCMAKE_CXX_FLAGS="-Werror" $TRAVIS_BUILD_DIR
+- cmake -DCMAKE_CXX_FLAGS="-Werror" -DPARQUET_ARROW=ON $TRAVIS_BUILD_DIR
- export PARQUET_TEST_DATA=$TRAVIS_BUILD_DIR/data
script:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3878056..42b10ee 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,9 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PARQUET_BUILD_EXECUTABLES
"Build the libparquet executable CLI tools"
ON)
+ option(PARQUET_ARROW
+ "Build the Arrow support"
+ OFF)
endif()
# If build in-source, create the latest symlink. If build out-of-source, which is
@@ -247,6 +250,16 @@ function(ADD_PARQUET_TEST_DEPENDENCIES REL_TEST_NAME)
add_dependencies(${TEST_NAME} ${ARGN})
endfunction()
+# A wrapper for add_dependencies() that is compatible with PARQUET_BUILD_TESTS.
+function(ADD_PARQUET_LINK_LIBRARIES REL_TEST_NAME)
+ if(NOT PARQUET_BUILD_TESTS)
+ return()
+ endif()
+ get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE)
+
+ target_link_libraries(${TEST_NAME} ${ARGN})
+endfunction()
+
enable_testing()
############################################################
@@ -553,6 +566,12 @@ if (PARQUET_BUILD_SHARED)
target_link_libraries(parquet_shared
LINK_PUBLIC ${LIBPARQUET_LINK_LIBS}
LINK_PRIVATE ${LIBPARQUET_PRIVATE_LINK_LIBS})
+ if (APPLE)
+ set_target_properties(parquet_shared
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+ endif()
endif()
if (PARQUET_BUILD_STATIC)
@@ -583,6 +602,22 @@ add_dependencies(parquet_objlib parquet_thrift)
add_subdirectory(benchmarks)
add_subdirectory(tools)
+# Arrow
+if (PARQUET_ARROW)
+ find_package(Arrow REQUIRED)
+ include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
+ add_library(arrow SHARED IMPORTED)
+ set_target_properties(arrow PROPERTIES IMPORTED_LOCATION ${ARROW_SHARED_LIB})
+ add_library(arrow_io SHARED IMPORTED)
+ set_target_properties(arrow_io PROPERTIES IMPORTED_LOCATION ${ARROW_IO_SHARED_LIB})
+ add_library(arrow_static STATIC IMPORTED)
+ set_target_properties(arrow_static PROPERTIES IMPORTED_LOCATION ${ARROW_STATIC_LIB})
+ add_library(arrow_io_static STATIC IMPORTED)
+ set_target_properties(arrow_io_static PROPERTIES IMPORTED_LOCATION ${ARROW_IO_STATIC_LIB})
+
+ add_subdirectory(src/parquet/arrow)
+endif()
+
add_custom_target(clean-all
COMMAND ${CMAKE_BUILD_TOOL} clean
COMMAND ${CMAKE_COMMAND} -P cmake_modules/clean-all.cmake
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/ci/travis_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh
index 2e7dcdd..8794559 100755
--- a/ci/travis_script_cpp.sh
+++ b/ci/travis_script_cpp.sh
@@ -16,13 +16,13 @@ make lint
if [ $TRAVIS_OS_NAME == "linux" ]; then
make -j4 || exit 1
- ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+ ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
sudo pip install cpp_coveralls
export PARQUET_ROOT=$TRAVIS_BUILD_DIR
$TRAVIS_BUILD_DIR/ci/upload_coverage.sh
else
make -j4 || exit 1
- ctest -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
+ ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; }
fi
popd
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindArrow.cmake b/cmake_modules/FindArrow.cmake
new file mode 100644
index 0000000..91d0e71
--- /dev/null
+++ b/cmake_modules/FindArrow.cmake
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so)
+# This module defines
+# ARROW_INCLUDE_DIR, directory containing headers
+# ARROW_LIBS, directory containing arrow libraries
+# ARROW_STATIC_LIB, path to libarrow.a
+# ARROW_SHARED_LIB, path to libarrow's shared library
+# ARROW_FOUND, whether arrow has been found
+
+set(ARROW_SEARCH_HEADER_PATHS
+ $ENV{ARROW_HOME}/include
+)
+
+set(ARROW_SEARCH_LIB_PATH
+ $ENV{ARROW_HOME}/lib
+)
+
+find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS
+ ${ARROW_SEARCH_HEADER_PATHS}
+ # make sure we don't accidentally pick up a different version
+ NO_DEFAULT_PATH
+)
+
+find_library(ARROW_LIB_PATH NAMES arrow
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
+find_library(ARROW_IO_LIB_PATH NAMES arrow_io
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
+if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
+ set(ARROW_FOUND TRUE)
+ set(ARROW_LIB_NAME libarrow)
+ set(ARROW_IO_LIB_NAME libarrow_io)
+
+ set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
+ set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
+ set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+ set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
+ set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+ if (NOT Arrow_FIND_QUIETLY)
+ message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
+ message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+ endif ()
+else ()
+ if (NOT Arrow_FIND_QUIETLY)
+ set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers")
+ set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs")
+ set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
+ if (Arrow_FIND_REQUIRED)
+ message(FATAL_ERROR "${ARROW_ERR_MSG}")
+ else (Arrow_FIND_REQUIRED)
+ message(STATUS "${ARROW_ERR_MSG}")
+ endif (Arrow_FIND_REQUIRED)
+ endif ()
+ set(ARROW_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+ ARROW_FOUND
+ ARROW_INCLUDE_DIR
+ ARROW_LIBS
+ ARROW_STATIC_LIB
+ ARROW_SHARED_LIB
+ ARROW_IO_STATIC_LIB
+ ARROW_IO_SHARED_LIB
+)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/conda.recipe/build.sh
----------------------------------------------------------------------
diff --git a/conda.recipe/build.sh b/conda.recipe/build.sh
index 7fe8f91..f77475c 100644
--- a/conda.recipe/build.sh
+++ b/conda.recipe/build.sh
@@ -53,6 +53,7 @@ cmake \
-DCMAKE_BUILD_TYPE=debug \
-DCMAKE_INSTALL_PREFIX=$PREFIX \
-DPARQUET_BUILD_BENCHMARKS=off \
+ -DPARQUET_ARROW=ON \
..
make
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/CMakeLists.txt b/src/parquet/arrow/CMakeLists.txt
new file mode 100644
index 0000000..5d923a7
--- /dev/null
+++ b/src/parquet/arrow/CMakeLists.txt
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# parquet_arrow : Arrow <-> Parquet adapter
+
+set(PARQUET_ARROW_SRCS
+ io.cc
+ reader.cc
+ schema.cc
+ writer.cc
+)
+
+add_library(parquet_arrow_objlib OBJECT
+ ${PARQUET_ARROW_SRCS}
+)
+
+# SET_TARGET_PROPERTIES(parquet_arrow PROPERTIES LINKER_LANGUAGE CXX)
+
+if (PARQUET_BUILD_SHARED)
+ add_library(parquet_arrow_shared SHARED $<TARGET_OBJECTS:parquet_arrow_objlib>)
+ set_target_properties(parquet_arrow_shared
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ LINK_FLAGS "${SHARED_LINK_FLAGS}"
+ OUTPUT_NAME "parquet_arrow")
+ target_link_libraries(parquet_arrow_shared
+ arrow
+ arrow_io
+ parquet_shared)
+ if (APPLE)
+ set_target_properties(parquet_arrow_shared
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+ endif()
+endif()
+
+if (PARQUET_BUILD_STATIC)
+ add_library(parquet_arrow_static STATIC $<TARGET_OBJECTS:parquet_arrow_objlib>)
+ set_target_properties(parquet_arrow_static
+ PROPERTIES
+ LIBRARY_OUTPUT_DIRECTORY "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ OUTPUT_NAME "parquet_arrow")
+ target_link_libraries(parquet_arrow_static
+ arrow_static
+ arrow_static
+ parquet_static)
+ install(TARGETS parquet_arrow_static
+ ARCHIVE DESTINATION lib
+ LIBRARY DESTINATION lib)
+endif()
+
+ADD_PARQUET_TEST(arrow-schema-test)
+ADD_PARQUET_TEST(arrow-io-test)
+ADD_PARQUET_TEST(arrow-reader-writer-test)
+
+if (PARQUET_BUILD_STATIC)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_static)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_static)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_static)
+else()
+ ADD_PARQUET_LINK_LIBRARIES(arrow-schema-test parquet_arrow_shared)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-io-test parquet_arrow_shared)
+ ADD_PARQUET_LINK_LIBRARIES(arrow-reader-writer-test parquet_arrow_shared)
+endif()
+
+# Headers: top level
+install(FILES
+ io.h
+ reader.h
+ schema.h
+ utils.h
+ writer.h
+ DESTINATION include/parquet/arrow)
+
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-io-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-io-test.cc b/src/parquet/arrow/arrow-io-test.cc
new file mode 100644
index 0000000..377fb97
--- /dev/null
+++ b/src/parquet/arrow/arrow-io-test.cc
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/io.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+using arrow::Status;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// Allocator tests
+
+TEST(TestParquetAllocator, DefaultCtor) {
+ ParquetAllocator allocator;
+
+ const int buffer_size = 10;
+
+ uint8_t* buffer = nullptr;
+ ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+ // valgrind will complain if we write into nullptr
+ memset(buffer, 0, buffer_size);
+
+ allocator.Free(buffer, buffer_size);
+}
+
+// Pass through to the default memory pool
+class TrackingPool : public MemoryPool {
+ public:
+ TrackingPool() : pool_(default_memory_pool()), bytes_allocated_(0) {}
+
+ Status Allocate(int64_t size, uint8_t** out) override {
+ RETURN_NOT_OK(pool_->Allocate(size, out));
+ bytes_allocated_ += size;
+ return Status::OK();
+ }
+
+ void Free(uint8_t* buffer, int64_t size) override {
+ pool_->Free(buffer, size);
+ bytes_allocated_ -= size;
+ }
+
+ int64_t bytes_allocated() const override { return bytes_allocated_; }
+
+ private:
+ MemoryPool* pool_;
+ int64_t bytes_allocated_;
+};
+
+TEST(TestParquetAllocator, CustomPool) {
+ TrackingPool pool;
+
+ ParquetAllocator allocator(&pool);
+
+ ASSERT_EQ(&pool, allocator.pool());
+
+ const int buffer_size = 10;
+
+ uint8_t* buffer = nullptr;
+ ASSERT_NO_THROW(buffer = allocator.Malloc(buffer_size););
+
+ ASSERT_EQ(buffer_size, pool.bytes_allocated());
+
+ // valgrind will complain if we write into nullptr
+ memset(buffer, 0, buffer_size);
+
+ allocator.Free(buffer, buffer_size);
+
+ ASSERT_EQ(0, pool.bytes_allocated());
+}
+
+// ----------------------------------------------------------------------
+// Read source tests
+
+class BufferReader : public ArrowROFile {
+ public:
+ BufferReader(const uint8_t* buffer, int buffer_size)
+ : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+ Status Close() override {
+ // no-op
+ return Status::OK();
+ }
+
+ Status Tell(int64_t* position) override {
+ *position = position_;
+ return Status::OK();
+ }
+
+ Status ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+ RETURN_NOT_OK(Seek(position));
+ return Read(nbytes, bytes_read, buffer);
+ }
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override {
+ memcpy(buffer, buffer_ + position_, nbytes);
+ *bytes_read = std::min(nbytes, buffer_size_ - position_);
+ position_ += *bytes_read;
+ return Status::OK();
+ }
+
+ Status GetSize(int64_t* size) override {
+ *size = buffer_size_;
+ return Status::OK();
+ }
+
+ Status Seek(int64_t position) override {
+ if (position < 0 || position >= buffer_size_) {
+ return Status::IOError("position out of bounds");
+ }
+
+ position_ = position;
+ return Status::OK();
+ }
+
+ private:
+ const uint8_t* buffer_;
+ int buffer_size_;
+ int64_t position_;
+};
+
+TEST(TestParquetReadSource, Basics) {
+ std::string data = "this is the data";
+ auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+ ParquetAllocator allocator(default_memory_pool());
+
+ auto file = std::make_shared<BufferReader>(data_buffer, data.size());
+ auto source = std::make_shared<ParquetReadSource>(&allocator);
+
+ ASSERT_OK(source->Open(file));
+
+ ASSERT_EQ(0, source->Tell());
+ ASSERT_NO_THROW(source->Seek(5));
+ ASSERT_EQ(5, source->Tell());
+ ASSERT_NO_THROW(source->Seek(0));
+
+ // Seek out of bounds
+ ASSERT_THROW(source->Seek(100), ParquetException);
+
+ uint8_t buffer[50];
+
+ ASSERT_NO_THROW(source->Read(4, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+ ASSERT_EQ(4, source->Tell());
+
+ std::shared_ptr<Buffer> pq_buffer;
+
+ ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+ auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+ ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
new file mode 100644
index 0000000..e4e9efa
--- /dev/null
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+#include "parquet/arrow/test-util.h"
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/writer.h"
+
+#include "arrow/test-util.h"
+#include "arrow/types/construct.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::ChunkedArray;
+using arrow::default_memory_pool;
+using arrow::PoolBuffer;
+using arrow::PrimitiveArray;
+using arrow::Status;
+using arrow::Table;
+
+using ParquetBuffer = parquet::Buffer;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const int SMALL_SIZE = 100;
+const int LARGE_SIZE = 10000;
+
+template <typename TestType>
+struct test_traits {};
+
+template <>
+struct test_traits<::arrow::BooleanType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::BooleanType>::value(1);
+
+template <>
+struct test_traits<::arrow::UInt8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_8;
+ static uint8_t const value;
+};
+
+const uint8_t test_traits<::arrow::UInt8Type>::value(64);
+
+template <>
+struct test_traits<::arrow::Int8Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_8;
+ static int8_t const value;
+};
+
+const int8_t test_traits<::arrow::Int8Type>::value(-64);
+
+template <>
+struct test_traits<::arrow::UInt16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_16;
+ static uint16_t const value;
+};
+
+const uint16_t test_traits<::arrow::UInt16Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int16Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::INT_16;
+ static int16_t const value;
+};
+
+const int16_t test_traits<::arrow::Int16Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_32;
+ static uint32_t const value;
+};
+
+const uint32_t test_traits<::arrow::UInt32Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int32Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int32_t const value;
+};
+
+const int32_t test_traits<::arrow::Int32Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::UInt64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::UINT_64;
+ static uint64_t const value;
+};
+
+const uint64_t test_traits<::arrow::UInt64Type>::value(1024);
+
+template <>
+struct test_traits<::arrow::Int64Type> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::Int64Type>::value(-1024);
+
+template <>
+struct test_traits<::arrow::TimestampType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::INT64;
+ static constexpr LogicalType::type logical_enum = LogicalType::TIMESTAMP_MILLIS;
+ static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
+
+template <>
+struct test_traits<::arrow::FloatType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static float const value;
+};
+
+const float test_traits<::arrow::FloatType>::value(2.1f);
+
+template <>
+struct test_traits<::arrow::DoubleType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE;
+ static constexpr LogicalType::type logical_enum = LogicalType::NONE;
+ static double const value;
+};
+
+const double test_traits<::arrow::DoubleType>::value(4.2);
+
+template <>
+struct test_traits<::arrow::StringType> {
+ static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY;
+ static constexpr LogicalType::type logical_enum = LogicalType::UTF8;
+ static std::string const value;
+};
+
+const std::string test_traits<::arrow::StringType>::value("Test");
+
+template <typename T>
+using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
+
+template <typename T>
+using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+
+template <typename TestType>
+class TestParquetIO : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) {
+ auto pnode = PrimitiveNode::Make("column1", repetition,
+ test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum);
+ NodePtr node_ =
+ GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+ return std::static_pointer_cast<GroupNode>(node_);
+ }
+
+ std::unique_ptr<ParquetFileWriter> MakeWriter(
+ const std::shared_ptr<GroupNode>& schema) {
+ sink_ = std::make_shared<InMemoryOutputStream>();
+ return ParquetFileWriter::Open(sink_, schema);
+ }
+
+ std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+ std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+ std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+ return ParquetFileReader::Open(std::move(source));
+ }
+
+ void ReadSingleColumnFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) {
+ FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+ std::unique_ptr<FlatColumnReader> column_reader;
+ ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader));
+ ASSERT_NE(nullptr, column_reader.get());
+
+ ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
+ std::shared_ptr<::arrow::Array> out;
+ ReadSingleColumnFile(ReaderFromSink(), &out);
+ ASSERT_TRUE(values->Equals(out));
+ }
+
+ void ReadTableFromFile(
+ std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) {
+ FileReader reader(::arrow::default_memory_pool(), std::move(file_reader));
+ ASSERT_OK_NO_THROW(reader.ReadFlatTable(out));
+ ASSERT_NE(nullptr, out->get());
+ }
+
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ std::shared_ptr<::arrow::Table> out;
+ ReadTableFromFile(ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(values->length(), out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+ }
+
+ template <typename ArrayType>
+ void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<ArrayType>& values) {
+ FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
+ ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+ ASSERT_OK_NO_THROW(writer.Close());
+ }
+
+ std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+// We habe separate tests for UInt32Type as this is currently the only type
+// where a roundtrip does not yield the identical Array structure.
+// There we write an UInt32 Array but receive an Int64 Array as result for
+// Parquet version 1.0.
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+ ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
+ ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
+ ::arrow::StringType> TestTypes;
+
+TYPED_TEST_CASE(TestParquetIO, TestTypes);
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ this->WriteFlatColumn(schema, values);
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, values->length(), default_writer_properties()));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(this->ReaderFromSink(), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(100, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
+ // This also tests max_definition_level = 1
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ this->WriteFlatColumn(schema, values);
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, values->length(), default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
+ auto values = NonNullArray<TypeParam>(SMALL_SIZE);
+ int64_t chunk_size = values->length() / 4;
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+ }
+ ASSERT_OK_NO_THROW(writer.Close());
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
+ auto values = NonNullArray<TypeParam>(LARGE_SIZE);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, false);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
+ int64_t chunk_size = SMALL_SIZE / 4;
+ auto values = NullableArray<TypeParam>(SMALL_SIZE, 10);
+
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+ FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
+ for (int i = 0; i < 4; i++) {
+ ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
+ ASSERT_OK_NO_THROW(
+ writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+ }
+ ASSERT_OK_NO_THROW(writer.Close());
+
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
+ // This also tests max_definition_level = 1
+ auto values = NullableArray<TypeParam>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
+ this->sink_, 512, default_writer_properties()));
+
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
+
+TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values =
+ NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 2.0 roundtrip should yield an uint32_t column again
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_2_0)
+ ->build();
+ ASSERT_OK_NO_THROW(
+ WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+ this->ReadAndCheckSingleColumnTable(values);
+}
+
+TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
+ // This also tests max_definition_level = 1
+ std::shared_ptr<PrimitiveArray> values =
+ NullableArray<::arrow::UInt32Type>(LARGE_SIZE, 100);
+ std::shared_ptr<Table> table = MakeSimpleTable(values, true);
+
+ // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0
+ // reader that a column is unsigned.
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::WriterProperties::Builder()
+ .version(ParquetVersion::PARQUET_1_0)
+ ->build();
+ ASSERT_OK_NO_THROW(WriteFlatTable(
+ table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties));
+
+ std::shared_ptr<Array> expected_values;
+ std::shared_ptr<PoolBuffer> int64_data =
+ std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+ {
+ ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
+ int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
+ const uint32_t* uint32_data_ptr =
+ reinterpret_cast<const uint32_t*>(values->data()->data());
+ // std::copy might be faster but this is explicit on the casts)
+ for (int64_t i = 0; i < values->length(); i++) {
+ int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
+ }
+ }
+ ASSERT_OK(MakePrimitiveArray(std::make_shared<::arrow::Int64Type>(), values->length(),
+ int64_data, values->null_count(), values->null_bitmap(), &expected_values));
+ this->ReadAndCheckSingleColumnTable(expected_values);
+}
+
+template <typename T>
+using ParquetCDataType = typename ParquetDataType<T>::c_type;
+
+template <typename TestType>
+class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
+ public:
+ typedef typename TestType::c_type T;
+
+ void MakeTestFile(std::vector<T>& values, int num_chunks,
+ std::unique_ptr<ParquetFileReader>* file_reader) {
+ std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+ std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
+ size_t chunk_size = values.size() / num_chunks;
+ // Convert to Parquet's expected physical type
+ std::vector<uint8_t> values_buffer(
+ sizeof(ParquetCDataType<TestType>) * values.size());
+ auto values_parquet =
+ reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
+ std::copy(values.cbegin(), values.cend(), values_parquet);
+ for (int i = 0; i < num_chunks; i++) {
+ auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+ auto column_writer =
+ static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
+ ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
+ column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+ column_writer->Close();
+ row_group_writer->Close();
+ }
+ file_writer->Close();
+ *file_reader = this->ReaderFromSink();
+ }
+
+ void CheckSingleColumnRequiredTableRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Table> out;
+ this->ReadTableFromFile(std::move(file_reader), &out);
+ ASSERT_EQ(1, out->num_columns());
+ ASSERT_EQ(SMALL_SIZE, out->num_rows());
+
+ std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+ ASSERT_EQ(1, chunked_array->num_chunks());
+ ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get());
+ }
+
+ void CheckSingleColumnRequiredRead(int num_chunks) {
+ std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value);
+ std::unique_ptr<ParquetFileReader> file_reader;
+ ASSERT_NO_THROW(MakeTestFile(values, num_chunks, &file_reader));
+
+ std::shared_ptr<Array> out;
+ this->ReadSingleColumnFile(std::move(file_reader), &out);
+
+ ExpectArray<TestType>(values.data(), out.get());
+ }
+};
+
+typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
+ ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
+ ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
+ ::arrow::DoubleType> PrimitiveTestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) {
+ this->CheckSingleColumnRequiredRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) {
+ this->CheckSingleColumnRequiredTableRead(1);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) {
+ this->CheckSingleColumnRequiredRead(4);
+}
+
+TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
+ this->CheckSingleColumnRequiredTableRead(4);
+}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
new file mode 100644
index 0000000..3dfaf14
--- /dev/null
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -0,0 +1,265 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "parquet/arrow/schema.h"
+
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "arrow/types/datetime.h"
+#include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
+
+using arrow::Field;
+
+using ParquetType = parquet::Type;
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::schema::NodePtr;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+namespace parquet {
+
+namespace arrow {
+
+const auto BOOL = std::make_shared<::arrow::BooleanType>();
+const auto UINT8 = std::make_shared<::arrow::UInt8Type>();
+const auto INT32 = std::make_shared<::arrow::Int32Type>();
+const auto INT64 = std::make_shared<::arrow::Int64Type>();
+const auto FLOAT = std::make_shared<::arrow::FloatType>();
+const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
+const auto UTF8 = std::make_shared<::arrow::StringType>();
+const auto TIMESTAMP_MS =
+ std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+// TODO: This requires parquet-cpp implementing the MICROS enum value
+// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
+const auto BINARY =
+ std::make_shared<::arrow::ListType>(std::make_shared<Field>("", UINT8));
+const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
+
+class TestConvertParquetSchema : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ void CheckFlatSchema(const std::shared_ptr<::arrow::Schema>& expected_schema) {
+ ASSERT_EQ(expected_schema->num_fields(), result_schema_->num_fields());
+ for (int i = 0; i < expected_schema->num_fields(); ++i) {
+ auto lhs = result_schema_->field(i);
+ auto rhs = expected_schema->field(i);
+ EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+ << " != " << rhs->ToString();
+ }
+ }
+
+ ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes) {
+ NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+ descr_.Init(schema);
+ return FromParquetSchema(&descr_, &result_schema_);
+ }
+
+ protected:
+ SchemaDescriptor descr_;
+ std::shared_ptr<::arrow::Schema> result_schema_;
+};
+
+TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+ arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+ arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+ arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+ parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+ // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+ // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+ arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+ arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("binary", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY));
+ arrow_fields.push_back(std::make_shared<Field>("binary", BINARY));
+
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+ arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+ parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL,
+ ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12));
+ arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY));
+
+ auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields));
+
+ CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, ParquetFlatDecimals) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(PrimitiveNode::Make("flba-decimal", Repetition::OPTIONAL,
+ ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, 4, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("flba-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("binary-decimal", Repetition::OPTIONAL,
+ ParquetType::BYTE_ARRAY, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("binary-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("int32-decimal", Repetition::OPTIONAL,
+ ParquetType::INT32, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("int32-decimal", DECIMAL_8_4));
+
+ parquet_fields.push_back(PrimitiveNode::Make("int64-decimal", Repetition::OPTIONAL,
+ ParquetType::INT64, LogicalType::DECIMAL, -1, 8, 4));
+ arrow_fields.push_back(std::make_shared<Field>("int64-decimal", DECIMAL_8_4));
+
+ auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields));
+
+ CheckFlatSchema(arrow_schema);
+}
+
+TEST_F(TestConvertParquetSchema, UnsupportedThings) {
+ std::vector<NodePtr> unsupported_nodes;
+
+ unsupported_nodes.push_back(
+ PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
+
+ unsupported_nodes.push_back(
+ GroupNode::Make("repeated-group", Repetition::REPEATED, {}));
+
+ unsupported_nodes.push_back(PrimitiveNode::Make(
+ "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
+
+ for (const NodePtr& node : unsupported_nodes) {
+ ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
+ }
+}
+
+class TestConvertArrowSchema : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ void CheckFlatSchema(const std::vector<NodePtr>& nodes) {
+ NodePtr schema_node = GroupNode::Make("schema", Repetition::REPEATED, nodes);
+ const GroupNode* expected_schema_node =
+ static_cast<const GroupNode*>(schema_node.get());
+ const GroupNode* result_schema_node = result_schema_->group_node();
+
+ ASSERT_EQ(expected_schema_node->field_count(), result_schema_node->field_count());
+
+ for (int i = 0; i < expected_schema_node->field_count(); i++) {
+ auto lhs = result_schema_node->field(i);
+ auto rhs = expected_schema_node->field(i);
+ EXPECT_TRUE(lhs->Equals(rhs.get()));
+ }
+ }
+
+ ::arrow::Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
+ arrow_schema_ = std::make_shared<::arrow::Schema>(fields);
+ std::shared_ptr<::parquet::WriterProperties> properties =
+ ::parquet::default_writer_properties();
+ return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
+ }
+
+ protected:
+ std::shared_ptr<::arrow::Schema> arrow_schema_;
+ std::shared_ptr<SchemaDescriptor> result_schema_;
+};
+
+TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("boolean", Repetition::REQUIRED, ParquetType::BOOLEAN));
+ arrow_fields.push_back(std::make_shared<Field>("boolean", BOOL, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int32", Repetition::REQUIRED, ParquetType::INT32));
+ arrow_fields.push_back(std::make_shared<Field>("int32", INT32, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
+ arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
+
+ parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+
+ // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
+ // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
+ // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
+ arrow_fields.push_back(std::make_shared<Field>("float", FLOAT));
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("double", Repetition::OPTIONAL, ParquetType::DOUBLE));
+ arrow_fields.push_back(std::make_shared<Field>("double", DOUBLE));
+
+ // TODO: String types need to be clarified a bit more in the Arrow spec
+ parquet_fields.push_back(PrimitiveNode::Make(
+ "string", Repetition::OPTIONAL, ParquetType::BYTE_ARRAY, LogicalType::UTF8));
+ arrow_fields.push_back(std::make_shared<Field>("string", UTF8));
+
+ ASSERT_OK(ConvertSchema(arrow_fields));
+
+ CheckFlatSchema(parquet_fields);
+}
+
+TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ // TODO: Test Decimal Arrow -> Parquet conversion
+
+ ASSERT_OK(ConvertSchema(arrow_fields));
+
+ CheckFlatSchema(parquet_fields);
+}
+
+TEST(TestNodeConversion, DateAndTime) {}
+
+} // namespace arrow
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.cc b/src/parquet/arrow/io.cc
new file mode 100644
index 0000000..8e2645a
--- /dev/null
+++ b/src/parquet/arrow/io.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/io.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/util/status.h"
+
+using arrow::Status;
+using arrow::MemoryPool;
+
+// To assist with readability
+using ArrowROFile = arrow::io::RandomAccessFile;
+
+namespace parquet {
+namespace arrow {
+
+// ----------------------------------------------------------------------
+// ParquetAllocator
+
+ParquetAllocator::ParquetAllocator() : pool_(::arrow::default_memory_pool()) {}
+
+ParquetAllocator::ParquetAllocator(MemoryPool* pool) : pool_(pool) {}
+
+ParquetAllocator::~ParquetAllocator() {}
+
+uint8_t* ParquetAllocator::Malloc(int64_t size) {
+ uint8_t* result;
+ PARQUET_THROW_NOT_OK(pool_->Allocate(size, &result));
+ return result;
+}
+
+void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
+ // Does not report Status
+ pool_->Free(buffer, size);
+}
+
+// ----------------------------------------------------------------------
+// ParquetReadSource
+
+ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
+ : file_(nullptr), allocator_(allocator) {}
+
+Status ParquetReadSource::Open(const std::shared_ptr<ArrowROFile>& file) {
+ int64_t file_size;
+ RETURN_NOT_OK(file->GetSize(&file_size));
+
+ file_ = file;
+ size_ = file_size;
+ return Status::OK();
+}
+
+void ParquetReadSource::Close() {
+ // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
+ // these classes in a borked state. Probably better to explicitly close.
+
+ // PARQUET_THROW_NOT_OK(file_->Close());
+}
+
+int64_t ParquetReadSource::Tell() const {
+ int64_t position;
+ PARQUET_THROW_NOT_OK(file_->Tell(&position));
+ return position;
+}
+
+void ParquetReadSource::Seek(int64_t position) {
+ PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+int64_t ParquetReadSource::Read(int64_t nbytes, uint8_t* out) {
+ int64_t bytes_read;
+ PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+ return bytes_read;
+}
+
+std::shared_ptr<Buffer> ParquetReadSource::Read(int64_t nbytes) {
+ // TODO(wesm): This code is duplicated from parquet/util/input.cc; suggests
+ // that there should be more code sharing amongst file-like sources
+ auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
+ result->Resize(nbytes);
+
+ int64_t bytes_read = Read(nbytes, result->mutable_data());
+ if (bytes_read < nbytes) { result->Resize(bytes_read); }
+ return result;
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/io.h b/src/parquet/arrow/io.h
new file mode 100644
index 0000000..dc60635
--- /dev/null
+++ b/src/parquet/arrow/io.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Bridges Arrow's IO interfaces and Parquet-cpp's IO interfaces
+
+#ifndef PARQUET_ARROW_IO_H
+#define PARQUET_ARROW_IO_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/api/io.h"
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/memory-pool.h"
+
+namespace parquet {
+
+namespace arrow {
+
+// An implementation of the Parquet MemoryAllocator API that plugs into an
+// existing Arrow memory pool. This way we can direct all allocations to a
+// single place rather than tracking allocations in different locations (for
+// example: without utilizing parquet-cpp's default allocator)
+class PARQUET_EXPORT ParquetAllocator : public MemoryAllocator {
+ public:
+ // Uses the default memory pool
+ ParquetAllocator();
+
+ explicit ParquetAllocator(::arrow::MemoryPool* pool);
+ virtual ~ParquetAllocator();
+
+ uint8_t* Malloc(int64_t size) override;
+ void Free(uint8_t* buffer, int64_t size) override;
+
+ void set_pool(::arrow::MemoryPool* pool) { pool_ = pool; }
+
+ ::arrow::MemoryPool* pool() const { return pool_; }
+
+ private:
+ ::arrow::MemoryPool* pool_;
+};
+
+class PARQUET_EXPORT ParquetReadSource : public RandomAccessSource {
+ public:
+ explicit ParquetReadSource(ParquetAllocator* allocator);
+
+ // We need to ask for the file size on opening the file, and this can fail
+ ::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file);
+
+ void Close() override;
+ int64_t Tell() const override;
+ void Seek(int64_t pos) override;
+ int64_t Read(int64_t nbytes, uint8_t* out) override;
+ std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+ private:
+ // An Arrow readable file of some kind
+ std::shared_ptr<::arrow::io::RandomAccessFile> file_;
+
+ // The allocator is required for creating managed buffers
+ ParquetAllocator* allocator_;
+};
+
+} // namespace arrow
+} // namespace parquet
+
+#endif // PARQUET_ARROW_IO_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
new file mode 100644
index 0000000..056b5ab
--- /dev/null
+++ b/src/parquet/arrow/reader.cc
@@ -0,0 +1,406 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/reader.h"
+
+#include <algorithm>
+#include <queue>
+#include <string>
+#include <vector>
+
+#include "parquet/arrow/io.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/utils.h"
+
+#include "arrow/column.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/types/primitive.h"
+#include "arrow/types/string.h"
+#include "arrow/util/status.h"
+
+using arrow::Array;
+using arrow::Column;
+using arrow::Field;
+using arrow::MemoryPool;
+using arrow::PoolBuffer;
+using arrow::Status;
+using arrow::Table;
+
+// Help reduce verbosity
+using ParquetRAS = parquet::RandomAccessSource;
+using ParquetReader = parquet::ParquetFileReader;
+
+namespace parquet {
+namespace arrow {
+
+template <typename ArrowType>
+struct ArrowTypeTraits {
+ typedef ::arrow::NumericBuilder<ArrowType> builder_type;
+};
+
+template <>
+struct ArrowTypeTraits<BooleanType> {
+ typedef ::arrow::BooleanBuilder builder_type;
+};
+
+template <typename ArrowType>
+using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+
+class FileReader::Impl {
+ public:
+ Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+ virtual ~Impl() {}
+
+ bool CheckForFlatColumn(const ColumnDescriptor* descr);
+ Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+ Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
+ Status ReadFlatTable(std::shared_ptr<Table>* out);
+
+ private:
+ MemoryPool* pool_;
+ std::unique_ptr<ParquetFileReader> reader_;
+};
+
+class FlatColumnReader::Impl {
+ public:
+ Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+ ParquetFileReader* reader, int column_index);
+ virtual ~Impl() {}
+
+ Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
+ template <typename ArrowType, typename ParquetType>
+ Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder);
+ template <typename ArrowType, typename ParquetType>
+ Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
+ BuilderType<ArrowType>* builder);
+
+ private:
+ void NextRowGroup();
+
+ template <typename InType, typename OutType>
+ struct can_copy_ptr {
+ static constexpr bool value =
+ std::is_same<InType, OutType>::value ||
+ (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
+ (sizeof(InType) == sizeof(OutType)));
+ };
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
+ return Status::OK();
+ }
+
+ template <typename InType, typename OutType,
+ typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
+ Status ConvertPhysicalType(
+ const InType* in_ptr, int64_t length, const OutType** out_ptr) {
+ RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
+ OutType* mutable_out_ptr =
+ reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
+ std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
+ *out_ptr = mutable_out_ptr;
+ return Status::OK();
+ }
+
+ MemoryPool* pool_;
+ const ColumnDescriptor* descr_;
+ ParquetFileReader* reader_;
+ int column_index_;
+ int next_row_group_;
+ std::shared_ptr<ColumnReader> column_reader_;
+ std::shared_ptr<Field> field_;
+
+ PoolBuffer values_buffer_;
+ PoolBuffer def_levels_buffer_;
+ PoolBuffer values_builder_buffer_;
+ PoolBuffer valid_bytes_buffer_;
+};
+
+FileReader::Impl::Impl(
+ MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : pool_(pool), reader_(std::move(reader)) {}
+
+bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
+ if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
+ return false;
+ } else if ((descr->max_definition_level() == 1) &&
+ (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+ return false;
+ }
+ return true;
+}
+
+Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+ const SchemaDescriptor* schema = reader_->metadata()->schema();
+
+ if (!CheckForFlatColumn(schema->Column(i))) {
+ return Status::Invalid("The requested column is not flat");
+ }
+ std::unique_ptr<FlatColumnReader::Impl> impl(
+ new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+ *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
+ return Status::OK();
+}
+
+Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+ std::unique_ptr<FlatColumnReader> flat_column_reader;
+ RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
+ return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out);
+}
+
+Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+ auto descr = reader_->metadata()->schema();
+
+ const std::string& name = descr->name();
+ std::shared_ptr<::arrow::Schema> schema;
+ RETURN_NOT_OK(FromParquetSchema(descr, &schema));
+
+ int num_columns = reader_->metadata()->num_columns();
+
+ std::vector<std::shared_ptr<Column>> columns(num_columns);
+ for (int i = 0; i < num_columns; i++) {
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(ReadFlatColumn(i, &array));
+ columns[i] = std::make_shared<Column>(schema->field(i), array);
+ }
+
+ *table = std::make_shared<Table>(name, schema, columns);
+ return Status::OK();
+}
+
+FileReader::FileReader(
+ MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+ : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
+
+// Static ctor
+Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+ ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
+ std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
+ RETURN_NOT_OK(source->Open(file));
+
+ // TODO(wesm): reader properties
+ std::unique_ptr<ParquetReader> pq_reader;
+ PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));
+
+ // Use the same memory pool as the ParquetAllocator
+ reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
+ return Status::OK();
+}
+
+Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+ return impl_->GetFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+ return impl_->ReadFlatColumn(i, out);
+}
+
+Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+ return impl_->ReadFlatTable(out);
+}
+
+FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+ ParquetFileReader* reader, int column_index)
+ : pool_(pool),
+ descr_(descr),
+ reader_(reader),
+ column_index_(column_index),
+ next_row_group_(0),
+ values_buffer_(pool),
+ def_levels_buffer_(pool) {
+ NodeToField(descr_->schema_node(), &field_);
+ NextRowGroup();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
+ int64_t values_read, BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ DCHECK(builder);
+ const ArrowCType* values_ptr = nullptr;
+ RETURN_NOT_OK(
+ (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
+ RETURN_NOT_OK(builder->Append(values_ptr, values_read));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+ typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
+ BuilderType<ArrowType>* builder) {
+ using ArrowCType = typename ArrowType::c_type;
+
+ DCHECK(builder);
+ RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
+ RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+ auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
+ uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ valid_bytes[i] = 0;
+ } else {
+ valid_bytes[i] = 1;
+ values_ptr[i] = values[values_idx++];
+ }
+ }
+ RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+ return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status FlatColumnReader::Impl::TypedReadBatch(
+ int batch_size, std::shared_ptr<Array>* out) {
+ using ParquetCType = typename ParquetType::c_type;
+
+ int values_to_read = batch_size;
+ BuilderType<ArrowType> builder(pool_, field_->type);
+ while ((values_to_read > 0) && column_reader_) {
+ values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
+ if (descr_->max_definition_level() > 0) {
+ def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+ }
+ auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ RETURN_NOT_OK(
+ (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+ } else {
+ // As per the defintion and checks for flat columns:
+ // descr_->max_definition_level() == 1
+ RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+ def_levels, values, values_read, levels_read, &builder)));
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+ *out = builder.Finish();
+ return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+ int batch_size, std::shared_ptr<Array>* out) {
+ int values_to_read = batch_size;
+ ::arrow::StringBuilder builder(pool_, field_->type);
+ while ((values_to_read > 0) && column_reader_) {
+ values_buffer_.Resize(values_to_read * sizeof(ByteArray));
+ if (descr_->max_definition_level() > 0) {
+ def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
+ }
+ auto reader =
+ dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
+ int64_t values_read;
+ int64_t levels_read;
+ int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+ auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
+ PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+ values_to_read, def_levels, nullptr, values, &values_read));
+ values_to_read -= levels_read;
+ if (descr_->max_definition_level() == 0) {
+ for (int64_t i = 0; i < levels_read; i++) {
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
+ }
+ } else {
+ // descr_->max_definition_level() == 1
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ RETURN_NOT_OK(builder.AppendNull());
+ } else {
+ RETURN_NOT_OK(
+ builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
+ values[values_idx].len));
+ values_idx++;
+ }
+ }
+ }
+ if (!column_reader_->HasNext()) { NextRowGroup(); }
+ }
+ *out = builder.Finish();
+ return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+ case ::arrow::Type::ENUM: \
+ return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
+ break;
+
+Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+ if (!column_reader_) {
+ // Exhausted all row groups.
+ *out = nullptr;
+ return Status::OK();
+ }
+
+ switch (field_->type->type) {
+ TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+ TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
+ TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+ default:
+ return Status::NotImplemented(field_->type->ToString());
+ }
+}
+
+void FlatColumnReader::Impl::NextRowGroup() {
+ if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+ column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ next_row_group_++;
+ } else {
+ column_reader_ = nullptr;
+ }
+}
+
+FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
+FlatColumnReader::~FlatColumnReader() {}
+
+Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+ return impl_->NextBatch(batch_size, out);
+}
+
+} // namespace arrow
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4a7bf117/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
new file mode 100644
index 0000000..e725728
--- /dev/null
+++ b/src/parquet/arrow/reader.h
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ARROW_READER_H
+#define PARQUET_ARROW_READER_H
+
+#include <memory>
+
+#include "parquet/api/reader.h"
+#include "parquet/api/schema.h"
+#include "parquet/arrow/io.h"
+
+#include "arrow/io/interfaces.h"
+
+namespace arrow {
+
+class Array;
+class MemoryPool;
+class RowBatch;
+class Status;
+class Table;
+}
+
+namespace parquet {
+
+namespace arrow {
+
+class FlatColumnReader;
+
+// Arrow read adapter class for deserializing Parquet files as Arrow row
+// batches.
+//
+// TODO(wesm): nested data does not always make sense with this user
+// interface unless you are only reading a single leaf node from a branch of
+// a table. For example:
+//
+// repeated group data {
+// optional group record {
+// optional int32 val1;
+// optional byte_array val2;
+// optional bool val3;
+// }
+// optional int32 val4;
+// }
+//
+// In the Parquet file, there are 3 leaf nodes:
+//
+// * data.record.val1
+// * data.record.val2
+// * data.record.val3
+// * data.val4
+//
+// When materializing this data in an Arrow array, we would have:
+//
+// data: list<struct<
+// record: struct<
+// val1: int32,
+// val2: string (= list<uint8>),
+// val3: bool,
+// >,
+// val4: int32
+// >>
+//
+// However, in the Parquet format, each leaf node has its own repetition and
+// definition levels describing the structure of the intermediate nodes in
+// this array structure. Thus, we will need to scan the leaf data for a group
+// of leaf nodes part of the same type tree to create a single result Arrow
+// nested array structure.
+//
+// This is additionally complicated "chunky" repeated fields or very large byte
+// arrays
+class PARQUET_EXPORT FileReader {
+ public:
+ FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+
+ // Since the distribution of columns amongst a Parquet file's row groups may
+ // be uneven (the number of values in each column chunk can be different), we
+ // provide a column-oriented read interface. The ColumnReader hides the
+ // details of paging through the file's row groups and yielding
+ // fully-materialized arrow::Array instances
+ //
+ // Returns error status if the column of interest is not flat.
+ ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+ // Read column as a whole into an Array.
+ ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+ // Read a table of flat columns into a Table.
+ ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
+
+ virtual ~FileReader();
+
+ private:
+ class PARQUET_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+};
+
+// At this point, the column reader is a stream iterator. It only knows how to
+// read the next batch of values for a particular column from the file until it
+// runs out.
+//
+// We also do not expose any internal Parquet details, such as row groups. This
+// might change in the future.
+class PARQUET_EXPORT FlatColumnReader {
+ public:
+ virtual ~FlatColumnReader();
+
+ // Scan the next array of the indicated size. The actual size of the
+ // returned array may be less than the passed size depending how much data is
+ // available in the file.
+ //
+ // When all the data in the file has been exhausted, the result is set to
+ // nullptr.
+ //
+ // Returns Status::OK on a successful read, including if you have exhausted
+ // the data available in the file.
+ ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+
+ private:
+ class PARQUET_NO_EXPORT Impl;
+ std::unique_ptr<Impl> impl_;
+ explicit FlatColumnReader(std::unique_ptr<Impl> impl);
+
+ friend class FileReader;
+};
+
+// Helper function to create a file reader from an implementation of an Arrow
+// readable file
+PARQUET_EXPORT
+::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
+ ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);
+
+} // namespace arrow
+} // namespace parquet
+
+#endif // PARQUET_ARROW_READER_H