You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/06/24 23:55:37 UTC
[2/2] arrow git commit: ARROW-222: Prototyping an IO interface for
Arrow, with initial HDFS target
ARROW-222: Prototyping an IO interface for Arrow, with initial HDFS target
- Switch Travis CI back to Ubuntu trusty (old Boost in precise has issues with
C++11)
- Adapt SFrame libhdfs shim for arrow
- Create C++ public API within arrow:io to libhdfs
- Implement and test many functions in libhdfs
- Start Cython wrapper interface to arrow_io. Begin Python file-like interface,
unit tests
- Add thirdparty hdfs.h so builds are possible without a local Hadoop distro
(e.g. in Travis CI).
Change-Id: I4a46e50f6c1c22787baa3749d8a542216341e630
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ef908302
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ef908302
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ef908302
Branch: refs/heads/master
Commit: ef90830290491294d2fccfc5dcb16d3c0f96a70a
Parents: f7ade7b
Author: Wes McKinney <we...@apache.org>
Authored: Fri Jun 24 16:41:08 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Fri Jun 24 16:41:08 2016 -0700
----------------------------------------------------------------------
.travis.yml | 5 +-
NOTICE.txt | 9 +
ci/travis_before_script_cpp.sh | 15 +-
cpp/CMakeLists.txt | 60 +-
cpp/doc/HDFS.md | 39 +
cpp/src/arrow/io/CMakeLists.txt | 97 +++
cpp/src/arrow/io/hdfs-io-test.cc | 315 ++++++++
cpp/src/arrow/io/hdfs.cc | 458 +++++++++++
cpp/src/arrow/io/hdfs.h | 213 +++++
cpp/src/arrow/io/interfaces.h | 71 ++
cpp/src/arrow/io/libhdfs_shim.cc | 544 +++++++++++++
cpp/src/arrow/parquet/parquet-io-test.cc | 4 +-
cpp/thirdparty/hadoop/include/hdfs.h | 1024 +++++++++++++++++++++++++
dev/merge_arrow_pr.py | 5 +-
python/CMakeLists.txt | 6 +-
python/cmake_modules/FindArrow.cmake | 17 +-
python/conda.recipe/meta.yaml | 1 +
python/pyarrow/error.pxd | 4 +-
python/pyarrow/error.pyx | 14 +-
python/pyarrow/includes/common.pxd | 18 +
python/pyarrow/includes/libarrow.pxd | 19 -
python/pyarrow/includes/libarrow_io.pxd | 93 +++
python/pyarrow/io.pyx | 504 ++++++++++++
python/pyarrow/tests/test_array.py | 47 +-
python/pyarrow/tests/test_io.py | 126 +++
python/setup.py | 9 +-
26 files changed, 3656 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index ac2b0d4..97229b1 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,5 +1,5 @@
sudo: required
-dist: precise
+dist: trusty
addons:
apt:
sources:
@@ -12,6 +12,9 @@ addons:
- ccache
- cmake
- valgrind
+ - libboost-dev
+ - libboost-filesystem-dev
+ - libboost-system-dev
matrix:
fast_finish: true
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
new file mode 100644
index 0000000..0310c89
--- /dev/null
+++ b/NOTICE.txt
@@ -0,0 +1,9 @@
+Apache Arrow
+Copyright 2016 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This product includes software from the SFrame project (BSD, 3-clause).
+* Copyright (C) 2015 Dato, Inc.
+* Copyright (c) 2009 Carnegie Mellon University.
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/ci/travis_before_script_cpp.sh
----------------------------------------------------------------------
diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh
index 9060cc9..08551f3 100755
--- a/ci/travis_before_script_cpp.sh
+++ b/ci/travis_before_script_cpp.sh
@@ -23,12 +23,21 @@ echo $GTEST_HOME
: ${ARROW_CPP_INSTALL=$TRAVIS_BUILD_DIR/cpp-install}
-CMAKE_COMMON_FLAGS="-DARROW_BUILD_BENCHMARKS=ON -DARROW_PARQUET=ON -DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
+CMAKE_COMMON_FLAGS="\
+-DARROW_BUILD_BENCHMARKS=ON \
+-DARROW_PARQUET=ON \
+-DARROW_HDFS=on \
+-DCMAKE_INSTALL_PREFIX=$ARROW_CPP_INSTALL"
if [ $TRAVIS_OS_NAME == "linux" ]; then
- cmake -DARROW_TEST_MEMCHECK=on $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+ cmake -DARROW_TEST_MEMCHECK=on \
+ $CMAKE_COMMON_FLAGS \
+ -DCMAKE_CXX_FLAGS="-Werror" \
+ $CPP_DIR
else
- cmake $CMAKE_COMMON_FLAGS -DCMAKE_CXX_FLAGS="-Werror" $CPP_DIR
+ cmake $CMAKE_COMMON_FLAGS \
+ -DCMAKE_CXX_FLAGS="-Werror" \
+ $CPP_DIR
fi
make -j4
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index bdf7572..18b4759 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -62,6 +62,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
"Build the Arrow IPC extensions"
ON)
+ option(ARROW_HDFS
+ "Build the Arrow IO extensions for the Hadoop file system"
+ OFF)
+
option(ARROW_SSE3
"Build Arrow with SSE3"
ON)
@@ -454,6 +458,47 @@ if ("$ENV{GBENCHMARK_HOME}" STREQUAL "")
set(GBENCHMARK_HOME ${THIRDPARTY_DIR}/installed)
endif()
+# ----------------------------------------------------------------------
+# Add Boost dependencies (code adapted from Apache Kudu (incubating))
+
+# find boost headers and libs
+set(Boost_DEBUG TRUE)
+set(Boost_USE_MULTITHREADED ON)
+set(Boost_USE_STATIC_LIBS ON)
+find_package(Boost COMPONENTS system filesystem REQUIRED)
+include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
+set(BOOST_STATIC_LIBS ${Boost_LIBRARIES})
+list(LENGTH BOOST_STATIC_LIBS BOOST_STATIC_LIBS_LEN)
+
+# Find Boost shared libraries.
+set(Boost_USE_STATIC_LIBS OFF)
+find_package(Boost COMPONENTS system filesystem REQUIRED)
+set(BOOST_SHARED_LIBS ${Boost_LIBRARIES})
+list(LENGTH BOOST_SHARED_LIBS BOOST_SHARED_LIBS_LEN)
+list(SORT BOOST_SHARED_LIBS)
+
+message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS})
+message(STATUS "Boost libraries: " ${Boost_LIBRARIES})
+
+math(EXPR LAST_IDX "${BOOST_STATIC_LIBS_LEN} - 1")
+foreach(IDX RANGE ${LAST_IDX})
+ list(GET BOOST_STATIC_LIBS ${IDX} BOOST_STATIC_LIB)
+ list(GET BOOST_SHARED_LIBS ${IDX} BOOST_SHARED_LIB)
+
+ # Remove the prefix/suffix from the library name.
+ #
+ # e.g. libboost_system-mt --> boost_system
+ get_filename_component(LIB_NAME ${BOOST_STATIC_LIB} NAME_WE)
+ string(REGEX REPLACE "lib([^-]*)(-mt)?" "\\1" LIB_NAME_NO_PREFIX_SUFFIX ${LIB_NAME})
+ ADD_THIRDPARTY_LIB(${LIB_NAME_NO_PREFIX_SUFFIX}
+ STATIC_LIB "${BOOST_STATIC_LIB}"
+ SHARED_LIB "${BOOST_SHARED_LIB}")
+ list(APPEND ARROW_BOOST_LIBS ${LIB_NAME_NO_PREFIX_SUFFIX})
+endforeach()
+include_directories(SYSTEM ${Boost_INCLUDE_DIR})
+
+# ----------------------------------------------------------------------
+# Enable / disable tests and benchmarks
if(ARROW_BUILD_TESTS)
add_custom_target(unittest ctest -L unittest)
@@ -529,12 +574,24 @@ endif (UNIX)
# "make lint" target
############################################################
if (UNIX)
+
+ file(GLOB_RECURSE LINT_FILES
+ "${CMAKE_CURRENT_SOURCE_DIR}/src/*.h"
+ "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cc"
+ )
+
+ FOREACH(item ${LINT_FILES})
+ IF(NOT (item MATCHES "_generated.h"))
+ LIST(APPEND FILTERED_LINT_FILES ${item})
+ ENDIF()
+ ENDFOREACH(item ${LINT_FILES})
+
# Full lint
add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
--verbose=2
--linelength=90
--filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references
- `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/_generated/g'`)
+ ${FILTERED_LINT_FILES})
endif (UNIX)
@@ -624,6 +681,7 @@ set_target_properties(arrow
target_link_libraries(arrow ${LIBARROW_LINK_LIBS})
add_subdirectory(src/arrow)
+add_subdirectory(src/arrow/io)
add_subdirectory(src/arrow/util)
add_subdirectory(src/arrow/types)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/doc/HDFS.md
----------------------------------------------------------------------
diff --git a/cpp/doc/HDFS.md b/cpp/doc/HDFS.md
new file mode 100644
index 0000000..e0d5dfd
--- /dev/null
+++ b/cpp/doc/HDFS.md
@@ -0,0 +1,39 @@
+## Using Arrow's HDFS (Apache Hadoop Distributed File System) interface
+
+### Build requirements
+
+To build the integration, pass the following option to CMake
+
+```shell
+-DARROW_HDFS=on
+```
+
+For convenience, we have bundled `hdfs.h` for libhdfs from Apache Hadoop in
+Arrow's thirdparty. If you wish to build against the `hdfs.h` in your installed
+Hadoop distribution, set the `$HADOOP_HOME` environment variable.
+
+### Runtime requirements
+
+By default, the HDFS client C++ class in `libarrow_io` uses the libhdfs JNI
+interface to the Java Hadoop client. This library is loaded **at runtime**
+(rather than at link / library load time, since the library may not be in your
+LD_LIBRARY_PATH), and relies on some environment variables.
+
+* `HADOOP_HOME`: the root of your installed Hadoop distribution. Check in the
+ `lib/native` directory to look for `libhdfs.so` if you have any questions
+ about which directory you're after.
+* `JAVA_HOME`: the location of your Java SDK installation
+* `CLASSPATH`: must contain the Hadoop jars. You can set these using:
+
+```shell
+export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
+```
+
+#### Setting $JAVA_HOME automatically on OS X
+
+The installed location of Java on OS X can vary, however the following snippet
+will set it automatically for you:
+
+```shell
+export JAVA_HOME=$(/usr/libexec/java_home)
+```
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
new file mode 100644
index 0000000..33b654f
--- /dev/null
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# ----------------------------------------------------------------------
+# arrow_io : Arrow IO interfaces
+
+set(ARROW_IO_LINK_LIBS
+ arrow
+)
+
+set(ARROW_IO_PRIVATE_LINK_LIBS
+ boost_system
+ boost_filesystem
+)
+
+set(ARROW_IO_TEST_LINK_LIBS
+ arrow_io
+ ${ARROW_IO_PRIVATE_LINK_LIBS})
+
+set(ARROW_IO_SRCS
+)
+
+if(ARROW_HDFS)
+ if(NOT THIRDPARTY_DIR)
+ message(FATAL_ERROR "THIRDPARTY_DIR not set")
+ endif()
+
+ if (DEFINED ENV{HADOOP_HOME})
+ set(HADOOP_HOME $ENV{HADOOP_HOME})
+ else()
+ set(HADOOP_HOME "${THIRDPARTY_DIR}/hadoop")
+ endif()
+
+ set(HDFS_H_PATH "${HADOOP_HOME}/include/hdfs.h")
+ if (NOT EXISTS ${HDFS_H_PATH})
+ message(FATAL_ERROR "Did not find hdfs.h at ${HDFS_H_PATH}")
+ endif()
+ message(STATUS "Found hdfs.h at: " ${HDFS_H_PATH})
+ message(STATUS "Building libhdfs shim component")
+
+ include_directories(SYSTEM "${HADOOP_HOME}/include")
+
+ set(ARROW_HDFS_SRCS
+ hdfs.cc
+ libhdfs_shim.cc)
+
+ set_property(SOURCE ${ARROW_HDFS_SRCS}
+ APPEND_STRING PROPERTY
+ COMPILE_FLAGS "-DHAS_HADOOP")
+
+ set(ARROW_IO_SRCS
+ ${ARROW_HDFS_SRCS}
+ ${ARROW_IO_SRCS})
+
+ ADD_ARROW_TEST(hdfs-io-test)
+ ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
+ ${ARROW_IO_TEST_LINK_LIBS})
+endif()
+
+add_library(arrow_io SHARED
+ ${ARROW_IO_SRCS}
+)
+target_link_libraries(arrow_io LINK_PUBLIC ${ARROW_IO_LINK_LIBS})
+target_link_libraries(arrow_io LINK_PRIVATE ${ARROW_IO_PRIVATE_LINK_LIBS})
+
+SET_TARGET_PROPERTIES(arrow_io PROPERTIES LINKER_LANGUAGE CXX)
+
+if (APPLE)
+ set_target_properties(arrow_io
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+endif()
+
+# Headers: top level
+install(FILES
+ hdfs.h
+ interfaces.h
+ DESTINATION include/arrow/io)
+
+install(TARGETS arrow_io
+ LIBRARY DESTINATION lib
+ ARCHIVE DESTINATION lib)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc
new file mode 100644
index 0000000..11d67ae
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs-io-test.cc
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include <boost/filesystem.hpp> // NOLINT
+
+#include "arrow/io/hdfs.h"
+#include "arrow/test-util.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+std::vector<uint8_t> RandomData(int64_t size) {
+ std::vector<uint8_t> buffer(size);
+ test::random_bytes(size, 0, buffer.data());
+ return buffer;
+}
+
+class TestHdfsClient : public ::testing::Test {
+ public:
+ Status MakeScratchDir() {
+ if (client_->Exists(scratch_dir_)) {
+ RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
+ }
+ return client_->CreateDirectory(scratch_dir_);
+ }
+
+ Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
+ bool append = false, int buffer_size = 0, int replication = 0,
+ int default_block_size = 0) {
+ std::shared_ptr<HdfsWriteableFile> file;
+ RETURN_NOT_OK(client_->OpenWriteable(
+ path, append, buffer_size, replication, default_block_size, &file));
+
+ RETURN_NOT_OK(file->Write(buffer, size));
+ RETURN_NOT_OK(file->Close());
+
+ return Status::OK();
+ }
+
+ std::string ScratchPath(const std::string& name) {
+ std::stringstream ss;
+ ss << scratch_dir_ << "/" << name;
+ return ss.str();
+ }
+
+ std::string HdfsAbsPath(const std::string& relpath) {
+ std::stringstream ss;
+ ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
+ return ss.str();
+ }
+
+ protected:
+ // Set up shared state between unit tests
+ static void SetUpTestCase() {
+ if (!ConnectLibHdfs().ok()) {
+ std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
+ return;
+ }
+
+ loaded_libhdfs_ = true;
+
+ const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
+ const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
+ const char* user = std::getenv("ARROW_HDFS_TEST_USER");
+
+ ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
+
+ conf_.host = host == nullptr ? "localhost" : host;
+ conf_.user = user;
+ conf_.port = port == nullptr ? 20500 : atoi(port);
+
+ ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
+ }
+
+ static void TearDownTestCase() {
+ if (client_) {
+ EXPECT_OK(client_->Delete(scratch_dir_, true));
+ EXPECT_OK(client_->Disconnect());
+ }
+ }
+
+ static bool loaded_libhdfs_;
+
+ // Resources shared amongst unit tests
+ static HdfsConnectionConfig conf_;
+ static std::string scratch_dir_;
+ static std::shared_ptr<HdfsClient> client_;
+};
+
+bool TestHdfsClient::loaded_libhdfs_ = false;
+HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
+
+std::string TestHdfsClient::scratch_dir_ =
+ boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+
+std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
+
+#define SKIP_IF_NO_LIBHDFS() \
+ if (!loaded_libhdfs_) { \
+ std::cout << "No libhdfs, skipping" << std::endl; \
+ return; \
+ }
+
+TEST_F(TestHdfsClient, ConnectsAgain) {
+ SKIP_IF_NO_LIBHDFS();
+
+ std::shared_ptr<HdfsClient> client;
+ ASSERT_OK(HdfsClient::Connect(&conf_, &client));
+ ASSERT_OK(client->Disconnect());
+}
+
+TEST_F(TestHdfsClient, CreateDirectory) {
+ SKIP_IF_NO_LIBHDFS();
+
+ std::string path = ScratchPath("create-directory");
+
+ if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
+
+ ASSERT_OK(client_->CreateDirectory(path));
+ ASSERT_TRUE(client_->Exists(path));
+ EXPECT_OK(client_->Delete(path, true));
+ ASSERT_FALSE(client_->Exists(path));
+}
+
+TEST_F(TestHdfsClient, GetCapacityUsed) {
+ SKIP_IF_NO_LIBHDFS();
+
+ // Who knows what is actually in your DFS cluster, but expect it to have
+ // positive used bytes and capacity
+ int64_t nbytes = 0;
+ ASSERT_OK(client_->GetCapacity(&nbytes));
+ ASSERT_LT(0, nbytes);
+
+ ASSERT_OK(client_->GetUsed(&nbytes));
+ ASSERT_LT(0, nbytes);
+}
+
+TEST_F(TestHdfsClient, GetPathInfo) {
+ SKIP_IF_NO_LIBHDFS();
+
+ HdfsPathInfo info;
+
+ ASSERT_OK(MakeScratchDir());
+
+ // Directory info
+ ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
+ ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+ ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
+ ASSERT_EQ(conf_.user, info.owner);
+
+ // TODO(wesm): test group, other attrs
+
+ auto path = ScratchPath("test-file");
+
+ const int size = 100;
+
+ std::vector<uint8_t> buffer = RandomData(size);
+
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+ ASSERT_OK(client_->GetPathInfo(path, &info));
+
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(HdfsAbsPath(path), info.name);
+ ASSERT_EQ(conf_.user, info.owner);
+ ASSERT_EQ(size, info.size);
+}
+
+TEST_F(TestHdfsClient, AppendToFile) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto path = ScratchPath("test-file");
+ const int size = 100;
+
+ std::vector<uint8_t> buffer = RandomData(size);
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+
+ // now append
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
+
+ HdfsPathInfo info;
+ ASSERT_OK(client_->GetPathInfo(path, &info));
+ ASSERT_EQ(size * 2, info.size);
+}
+
+TEST_F(TestHdfsClient, ListDirectory) {
+ SKIP_IF_NO_LIBHDFS();
+
+ const int size = 100;
+ std::vector<uint8_t> data = RandomData(size);
+
+ auto p1 = ScratchPath("test-file-1");
+ auto p2 = ScratchPath("test-file-2");
+ auto d1 = ScratchPath("test-dir-1");
+
+ ASSERT_OK(MakeScratchDir());
+ ASSERT_OK(WriteDummyFile(p1, data.data(), size));
+ ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
+ ASSERT_OK(client_->CreateDirectory(d1));
+
+ std::vector<HdfsPathInfo> listing;
+ ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+ // Do it again, appends!
+ ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+ ASSERT_EQ(6, listing.size());
+
+ // Argh, well, shouldn't expect the listing to be in any particular order
+ for (size_t i = 0; i < listing.size(); ++i) {
+ const HdfsPathInfo& info = listing[i];
+ if (info.name == HdfsAbsPath(p1)) {
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(size, info.size);
+ } else if (info.name == HdfsAbsPath(p2)) {
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(size / 2, info.size);
+ } else if (info.name == HdfsAbsPath(d1)) {
+ ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+ } else {
+ FAIL() << "Unexpected path: " << info.name;
+ }
+ }
+}
+
+TEST_F(TestHdfsClient, ReadableMethods) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto path = ScratchPath("test-file");
+ const int size = 100;
+
+ std::vector<uint8_t> data = RandomData(size);
+ ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+ std::shared_ptr<HdfsReadableFile> file;
+ ASSERT_OK(client_->OpenReadable(path, &file));
+
+ // Test GetSize -- move this into its own unit test if ever needed
+ int64_t file_size;
+ ASSERT_OK(file->GetSize(&file_size));
+ ASSERT_EQ(size, file_size);
+
+ uint8_t buffer[50];
+ int32_t bytes_read = 0;
+
+ ASSERT_OK(file->Read(50, &bytes_read, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
+ ASSERT_EQ(50, bytes_read);
+
+ ASSERT_OK(file->Read(50, &bytes_read, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
+ ASSERT_EQ(50, bytes_read);
+
+ // EOF
+ ASSERT_OK(file->Read(1, &bytes_read, buffer));
+ ASSERT_EQ(0, bytes_read);
+
+ // ReadAt to EOF
+ ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
+ ASSERT_EQ(40, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
+
+ // Seek, Tell
+ ASSERT_OK(file->Seek(60));
+
+ int64_t position;
+ ASSERT_OK(file->Tell(&position));
+ ASSERT_EQ(60, position);
+}
+
+TEST_F(TestHdfsClient, RenameFile) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto src_path = ScratchPath("src-file");
+ auto dst_path = ScratchPath("dst-file");
+ const int size = 100;
+
+ std::vector<uint8_t> data = RandomData(size);
+ ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
+
+ ASSERT_OK(client_->Rename(src_path, dst_path));
+
+ ASSERT_FALSE(client_->Exists(src_path));
+ ASSERT_TRUE(client_->Exists(dst_path));
+}
+
+} // namespace io
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
new file mode 100644
index 0000000..6da6ea4
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -0,0 +1,458 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <hdfs.h>
+
+#include <cstdint>
+#include <sstream>
+#include <string>
+
+#include "arrow/io/hdfs.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+#define CHECK_FAILURE(RETURN_VALUE, WHAT) \
+ do { \
+ if (RETURN_VALUE == -1) { \
+ std::stringstream ss; \
+ ss << "HDFS: " << WHAT << " failed"; \
+ return Status::IOError(ss.str()); \
+ } \
+ } while (0)
+
+static Status CheckReadResult(int ret) {
+ // Check for error on -1 (possibly errno set)
+
+ // ret == 0 at end of file, which is OK
+ if (ret == -1) {
+ // EOF
+ std::stringstream ss;
+ ss << "HDFS read failed, errno: " << errno;
+ return Status::IOError(ss.str());
+ }
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// File reading
+
+class HdfsAnyFileImpl {
+ public:
+ void set_members(const std::string& path, hdfsFS fs, hdfsFile handle) {
+ path_ = path;
+ fs_ = fs;
+ file_ = handle;
+ is_open_ = true;
+ }
+
+ Status Seek(int64_t position) {
+ int ret = hdfsSeek(fs_, file_, position);
+ CHECK_FAILURE(ret, "seek");
+ return Status::OK();
+ }
+
+ Status Tell(int64_t* offset) {
+ int64_t ret = hdfsTell(fs_, file_);
+ CHECK_FAILURE(ret, "tell");
+ *offset = ret;
+ return Status::OK();
+ }
+
+ bool is_open() const { return is_open_; }
+
+ protected:
+ std::string path_;
+
+ // These are pointers in libhdfs, so OK to copy
+ hdfsFS fs_;
+ hdfsFile file_;
+
+ bool is_open_;
+};
+
+// Private implementation for read-only files
+class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
+ public:
+ HdfsReadableFileImpl() {}
+
+ Status Close() {
+ if (is_open_) {
+ int ret = hdfsCloseFile(fs_, file_);
+ CHECK_FAILURE(ret, "CloseFile");
+ is_open_ = false;
+ }
+ return Status::OK();
+ }
+
+ Status ReadAt(int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+ tSize ret = hdfsPread(fs_, file_, static_cast<tOffset>(position),
+ reinterpret_cast<void*>(buffer), nbytes);
+ RETURN_NOT_OK(CheckReadResult(ret));
+ *bytes_read = ret;
+ return Status::OK();
+ }
+
+ Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+ tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
+ RETURN_NOT_OK(CheckReadResult(ret));
+ *bytes_read = ret;
+ return Status::OK();
+ }
+
+ Status GetSize(int64_t* size) {
+ hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path_.c_str());
+ if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
+
+ *size = entry->mSize;
+ hdfsFreeFileInfo(entry, 1);
+ return Status::OK();
+ }
+};
+
+HdfsReadableFile::HdfsReadableFile() {
+ impl_.reset(new HdfsReadableFileImpl());
+}
+
+HdfsReadableFile::~HdfsReadableFile() {
+ impl_->Close();
+}
+
+Status HdfsReadableFile::Close() {
+ return impl_->Close();
+}
+
+Status HdfsReadableFile::ReadAt(
+ int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+ return impl_->ReadAt(position, nbytes, bytes_read, buffer);
+}
+
+Status HdfsReadableFile::Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) {
+ return impl_->Read(nbytes, bytes_read, buffer);
+}
+
+Status HdfsReadableFile::GetSize(int64_t* size) {
+ return impl_->GetSize(size);
+}
+
+Status HdfsReadableFile::Seek(int64_t position) {
+ return impl_->Seek(position);
+}
+
+Status HdfsReadableFile::Tell(int64_t* position) {
+ return impl_->Tell(position);
+}
+
+// ----------------------------------------------------------------------
+// File writing
+
+// Private implementation for writeable-only files
+class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
+ public:
+ HdfsWriteableFileImpl() {}
+
+ Status Close() {
+ if (is_open_) {
+ int ret = hdfsFlush(fs_, file_);
+ CHECK_FAILURE(ret, "Flush");
+ ret = hdfsCloseFile(fs_, file_);
+ CHECK_FAILURE(ret, "CloseFile");
+ is_open_ = false;
+ }
+ return Status::OK();
+ }
+
+ Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written) {
+ tSize ret = hdfsWrite(fs_, file_, reinterpret_cast<const void*>(buffer), nbytes);
+ CHECK_FAILURE(ret, "Write");
+ *bytes_written = ret;
+ return Status::OK();
+ }
+};
+
+HdfsWriteableFile::HdfsWriteableFile() {
+ impl_.reset(new HdfsWriteableFileImpl());
+}
+
+HdfsWriteableFile::~HdfsWriteableFile() {
+ impl_->Close();
+}
+
+Status HdfsWriteableFile::Close() {
+ return impl_->Close();
+}
+
+Status HdfsWriteableFile::Write(
+ const uint8_t* buffer, int32_t nbytes, int32_t* bytes_read) {
+ return impl_->Write(buffer, nbytes, bytes_read);
+}
+
+Status HdfsWriteableFile::Write(const uint8_t* buffer, int32_t nbytes) {
+ int32_t bytes_written_dummy = 0;
+ return Write(buffer, nbytes, &bytes_written_dummy);
+}
+
+Status HdfsWriteableFile::Tell(int64_t* position) {
+ return impl_->Tell(position);
+}
+
+// ----------------------------------------------------------------------
+// HDFS client
+
+// TODO(wesm): this could throw std::bad_alloc in the course of copying strings
+// into the path info object
+static void SetPathInfo(const hdfsFileInfo* input, HdfsPathInfo* out) {
+ out->kind = input->mKind == kObjectKindFile ? ObjectType::FILE : ObjectType::DIRECTORY;
+ out->name = std::string(input->mName);
+ out->owner = std::string(input->mOwner);
+ out->group = std::string(input->mGroup);
+
+ out->last_access_time = static_cast<int32_t>(input->mLastAccess);
+ out->last_modified_time = static_cast<int32_t>(input->mLastMod);
+ out->size = static_cast<int64_t>(input->mSize);
+
+ out->replication = input->mReplication;
+ out->block_size = input->mBlockSize;
+
+ out->permissions = input->mPermissions;
+}
+
+// Private implementation
+class HdfsClient::HdfsClientImpl {
+ public:
+ HdfsClientImpl() {}
+
+ Status Connect(const HdfsConnectionConfig* config) {
+ RETURN_NOT_OK(ConnectLibHdfs());
+
+ fs_ = hdfsConnectAsUser(config->host.c_str(), config->port, config->user.c_str());
+
+ if (fs_ == nullptr) { return Status::IOError("HDFS connection failed"); }
+ namenode_host_ = config->host;
+ port_ = config->port;
+ user_ = config->user;
+
+ return Status::OK();
+ }
+
+ Status CreateDirectory(const std::string& path) {
+ int ret = hdfsCreateDirectory(fs_, path.c_str());
+ CHECK_FAILURE(ret, "create directory");
+ return Status::OK();
+ }
+
+ Status Delete(const std::string& path, bool recursive) {
+ int ret = hdfsDelete(fs_, path.c_str(), static_cast<int>(recursive));
+ CHECK_FAILURE(ret, "delete");
+ return Status::OK();
+ }
+
+ Status Disconnect() {
+ int ret = hdfsDisconnect(fs_);
+ CHECK_FAILURE(ret, "hdfsFS::Disconnect");
+ return Status::OK();
+ }
+
+ bool Exists(const std::string& path) {
+ // hdfsExists does not distinguish between RPC failure and the file not
+ // existing
+ int ret = hdfsExists(fs_, path.c_str());
+ return ret == 0;
+ }
+
+ Status GetCapacity(int64_t* nbytes) {
+ tOffset ret = hdfsGetCapacity(fs_);
+ CHECK_FAILURE(ret, "GetCapacity");
+ *nbytes = ret;
+ return Status::OK();
+ }
+
+ Status GetUsed(int64_t* nbytes) {
+ tOffset ret = hdfsGetUsed(fs_);
+ CHECK_FAILURE(ret, "GetUsed");
+ *nbytes = ret;
+ return Status::OK();
+ }
+
+ Status GetPathInfo(const std::string& path, HdfsPathInfo* info) {
+ hdfsFileInfo* entry = hdfsGetPathInfo(fs_, path.c_str());
+
+ if (entry == nullptr) { return Status::IOError("HDFS: GetPathInfo failed"); }
+
+ SetPathInfo(entry, info);
+ hdfsFreeFileInfo(entry, 1);
+
+ return Status::OK();
+ }
+
+ Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing) {
+ int num_entries = 0;
+ hdfsFileInfo* entries = hdfsListDirectory(fs_, path.c_str(), &num_entries);
+
+ if (entries == nullptr) {
+ // If the directory is empty, entries is NULL but errno is 0. Non-zero
+ // errno indicates error
+ //
+ // Note: errno is thread-locala
+ if (errno == 0) { num_entries = 0; }
+ { return Status::IOError("HDFS: list directory failed"); }
+ }
+
+ // Allocate additional space for elements
+
+ int vec_offset = listing->size();
+ listing->resize(vec_offset + num_entries);
+
+ for (int i = 0; i < num_entries; ++i) {
+ SetPathInfo(entries + i, &(*listing)[vec_offset + i]);
+ }
+
+ // Free libhdfs file info
+ hdfsFreeFileInfo(entries, num_entries);
+
+ return Status::OK();
+ }
+
+ Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
+ hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0);
+
+ if (handle == nullptr) {
+ // TODO(wesm): determine cause of failure
+ std::stringstream ss;
+ ss << "Unable to open file " << path;
+ return Status::IOError(ss.str());
+ }
+
+ // std::make_shared does not work with private ctors
+ *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
+ (*file)->impl_->set_members(path, fs_, handle);
+
+ return Status::OK();
+ }
+
+ Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
+ int16_t replication, int64_t default_block_size,
+ std::shared_ptr<HdfsWriteableFile>* file) {
+ int flags = O_WRONLY;
+ if (append) flags |= O_APPEND;
+
+ hdfsFile handle = hdfsOpenFile(
+ fs_, path.c_str(), flags, buffer_size, replication, default_block_size);
+
+ if (handle == nullptr) {
+ // TODO(wesm): determine cause of failure
+ std::stringstream ss;
+ ss << "Unable to open file " << path;
+ return Status::IOError(ss.str());
+ }
+
+ // std::make_shared does not work with private ctors
+ *file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
+ (*file)->impl_->set_members(path, fs_, handle);
+
+ return Status::OK();
+ }
+
+ Status Rename(const std::string& src, const std::string& dst) {
+ int ret = hdfsRename(fs_, src.c_str(), dst.c_str());
+ CHECK_FAILURE(ret, "Rename");
+ return Status::OK();
+ }
+
+ private:
+ std::string namenode_host_;
+ std::string user_;
+ int port_;
+
+ hdfsFS fs_;
+};
+
+// ----------------------------------------------------------------------
+// Public API for HDFSClient
+
+HdfsClient::HdfsClient() {
+ impl_.reset(new HdfsClientImpl());
+}
+
+HdfsClient::~HdfsClient() {}
+
+Status HdfsClient::Connect(
+ const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs) {
+ // ctor is private, make_shared will not work
+ *fs = std::shared_ptr<HdfsClient>(new HdfsClient());
+
+ RETURN_NOT_OK((*fs)->impl_->Connect(config));
+ return Status::OK();
+}
+
+Status HdfsClient::CreateDirectory(const std::string& path) {
+ return impl_->CreateDirectory(path);
+}
+
+Status HdfsClient::Delete(const std::string& path, bool recursive) {
+ return impl_->Delete(path, recursive);
+}
+
+Status HdfsClient::Disconnect() {
+ return impl_->Disconnect();
+}
+
+bool HdfsClient::Exists(const std::string& path) {
+ return impl_->Exists(path);
+}
+
+Status HdfsClient::GetPathInfo(const std::string& path, HdfsPathInfo* info) {
+ return impl_->GetPathInfo(path, info);
+}
+
+Status HdfsClient::GetCapacity(int64_t* nbytes) {
+ return impl_->GetCapacity(nbytes);
+}
+
+Status HdfsClient::GetUsed(int64_t* nbytes) {
+ return impl_->GetUsed(nbytes);
+}
+
+Status HdfsClient::ListDirectory(
+ const std::string& path, std::vector<HdfsPathInfo>* listing) {
+ return impl_->ListDirectory(path, listing);
+}
+
+Status HdfsClient::OpenReadable(
+ const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
+ return impl_->OpenReadable(path, file);
+}
+
+Status HdfsClient::OpenWriteable(const std::string& path, bool append,
+ int32_t buffer_size, int16_t replication, int64_t default_block_size,
+ std::shared_ptr<HdfsWriteableFile>* file) {
+ return impl_->OpenWriteable(
+ path, append, buffer_size, replication, default_block_size, file);
+}
+
+Status HdfsClient::OpenWriteable(
+ const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file) {
+ return OpenWriteable(path, append, 0, 0, 0, file);
+}
+
+Status HdfsClient::Rename(const std::string& src, const std::string& dst) {
+ return impl_->Rename(src, dst);
+}
+
+} // namespace io
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
new file mode 100644
index 0000000..a1972db
--- /dev/null
+++ b/cpp/src/arrow/io/hdfs.h
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_HDFS
+#define ARROW_IO_HDFS
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/macros.h"
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+Status ConnectLibHdfs();
+
+class HdfsClient;
+class HdfsReadableFile;
+class HdfsWriteableFile;
+
+struct HdfsPathInfo {
+ ObjectType::type kind;
+
+ std::string name;
+ std::string owner;
+ std::string group;
+
+ // Access times in UNIX timestamps (seconds)
+ int64_t size;
+ int64_t block_size;
+
+ int32_t last_modified_time;
+ int32_t last_access_time;
+
+ int16_t replication;
+ int16_t permissions;
+};
+
+struct HdfsConnectionConfig {
+ std::string host;
+ int port;
+ std::string user;
+
+ // TODO: Kerberos, etc.
+};
+
+class HdfsClient : public FileSystemClient {
+ public:
+ ~HdfsClient();
+
+ // Connect to an HDFS cluster at indicated host, port, and as user
+ //
+ // @param host (in)
+ // @param port (in)
+ // @param user (in): user to identify as
+ // @param fs (out): the created client
+ // @returns Status
+ static Status Connect(
+ const HdfsConnectionConfig* config, std::shared_ptr<HdfsClient>* fs);
+
+ // Create directory and all parents
+ //
+ // @param path (in): absolute HDFS path
+ // @returns Status
+ Status CreateDirectory(const std::string& path);
+
+ // Delete file or directory
+ // @param path: absolute path to data
+ // @param recursive: if path is a directory, delete contents as well
+ // @returns error status on failure
+ Status Delete(const std::string& path, bool recursive = false);
+
+ // Disconnect from cluster
+ //
+ // @returns Status
+ Status Disconnect();
+
+ // @param path (in): absolute HDFS path
+ // @returns bool, true if the path exists, false if not (or on error)
+ bool Exists(const std::string& path);
+
+ // @param path (in): absolute HDFS path
+ // @param info (out)
+ // @returns Status
+ Status GetPathInfo(const std::string& path, HdfsPathInfo* info);
+
+ // @param nbytes (out): total capacity of the filesystem
+ // @returns Status
+ Status GetCapacity(int64_t* nbytes);
+
+ // @param nbytes (out): total bytes used of the filesystem
+ // @returns Status
+ Status GetUsed(int64_t* nbytes);
+
+ Status ListDirectory(const std::string& path, std::vector<HdfsPathInfo>* listing);
+
+ // @param path file path to change
+ // @param owner pass nullptr for no change
+ // @param group pass nullptr for no change
+ Status Chown(const std::string& path, const char* owner, const char* group);
+
+ Status Chmod(const std::string& path, int mode);
+
+ // Move file or directory from source path to destination path within the
+ // current filesystem
+ Status Rename(const std::string& src, const std::string& dst);
+
+ // TODO(wesm): GetWorkingDirectory, SetWorkingDirectory
+
+ // Open an HDFS file in READ mode. Returns error
+ // status if the file is not found.
+ //
+ // @param path complete file path
+ Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);
+
+ // FileMode::WRITE options
+ // @param path complete file path
+ // @param buffer_size, 0 for default
+ // @param replication, 0 for default
+ // @param default_block_size, 0 for default
+ Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
+ int16_t replication, int64_t default_block_size,
+ std::shared_ptr<HdfsWriteableFile>* file);
+
+ Status OpenWriteable(
+ const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file);
+
+ private:
+ friend class HdfsReadableFile;
+ friend class HdfsWriteableFile;
+
+ class HdfsClientImpl;
+ std::unique_ptr<HdfsClientImpl> impl_;
+
+ HdfsClient();
+ DISALLOW_COPY_AND_ASSIGN(HdfsClient);
+};
+
+class HdfsReadableFile : public RandomAccessFile {
+ public:
+ ~HdfsReadableFile();
+
+ Status Close() override;
+
+ Status GetSize(int64_t* size) override;
+
+ Status ReadAt(
+ int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+
+ Status Seek(int64_t position) override;
+ Status Tell(int64_t* position) override;
+
+ // NOTE: If you wish to read a particular range of a file in a multithreaded
+ // context, you may prefer to use ReadAt to avoid locking issues
+ Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) override;
+
+ private:
+ class HdfsReadableFileImpl;
+ std::unique_ptr<HdfsReadableFileImpl> impl_;
+
+ friend class HdfsClient::HdfsClientImpl;
+
+ HdfsReadableFile();
+ DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
+};
+
+class HdfsWriteableFile : public WriteableFile {
+ public:
+ ~HdfsWriteableFile();
+
+ Status Close() override;
+
+ Status Write(const uint8_t* buffer, int32_t nbytes) override;
+
+ Status Write(const uint8_t* buffer, int32_t nbytes, int32_t* bytes_written);
+
+ Status Tell(int64_t* position) override;
+
+ private:
+ class HdfsWriteableFileImpl;
+ std::unique_ptr<HdfsWriteableFileImpl> impl_;
+
+ friend class HdfsClient::HdfsClientImpl;
+
+ HdfsWriteableFile();
+
+ DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_HDFS
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
new file mode 100644
index 0000000..4bd8a8f
--- /dev/null
+++ b/cpp/src/arrow/io/interfaces.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_INTERFACES
+#define ARROW_IO_INTERFACES
+
+#include <cstdint>
+
+namespace arrow {
+
+class Status;
+
+namespace io {
+
+struct FileMode {
+ enum type { READ, WRITE, READWRITE };
+};
+
+struct ObjectType {
+ enum type { FILE, DIRECTORY };
+};
+
+class FileSystemClient {
+ public:
+ virtual ~FileSystemClient() {}
+};
+
+class FileBase {
+ virtual Status Close() = 0;
+
+ virtual Status Tell(int64_t* position) = 0;
+};
+
+class ReadableFile : public FileBase {
+ public:
+ virtual Status ReadAt(
+ int64_t position, int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+
+ virtual Status Read(int32_t nbytes, int32_t* bytes_read, uint8_t* buffer) = 0;
+
+ virtual Status GetSize(int64_t* size) = 0;
+};
+
+class RandomAccessFile : public ReadableFile {
+ public:
+ virtual Status Seek(int64_t position) = 0;
+};
+
+class WriteableFile : public FileBase {
+ public:
+ virtual Status Write(const uint8_t* buffer, int32_t nbytes) = 0;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_INTERFACES
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
new file mode 100644
index 0000000..f752665
--- /dev/null
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This shim interface to libhdfs (for runtime shared library loading) has been
+// adapted from the SFrame project, released under the ASF-compatible 3-clause
+// BSD license
+//
+// Using this required having the $JAVA_HOME and $HADOOP_HOME environment
+// variables set, so that libjvm and libhdfs can be located easily
+
+// Copyright (C) 2015 Dato, Inc.
+// All rights reserved.
+//
+// This software may be modified and distributed under the terms
+// of the BSD license. See the LICENSE file for details.
+
+#ifdef HAS_HADOOP
+
+#ifndef _WIN32
+#include <dlfcn.h>
+#else
+#include <winsock2.h>
+#include <windows.h>
+
+// TODO(wesm): address when/if we add windows support
+// #include <util/syserr_reporting.hpp>
+#endif
+
+extern "C" {
+#include <hdfs.h>
+}
+
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include <boost/filesystem.hpp> // NOLINT
+#include <boost/algorithm/string.hpp> // NOLINT
+
+#include "arrow/util/status.h"
+
+namespace fs = boost::filesystem;
+
+extern "C" {
+
+#ifndef _WIN32
+static void* libhdfs_handle = NULL;
+static void* libjvm_handle = NULL;
+#else
+static HINSTANCE libhdfs_handle = NULL;
+static HINSTANCE libjvm_handle = NULL;
+#endif
+/*
+ * All the shim pointers
+ */
+
+// NOTE(wesm): cpplint does not like use of short and other imprecise C types
+
+static hdfsFS (*ptr_hdfsConnectAsUser)(
+ const char* host, tPort port, const char* user) = NULL;
+static hdfsFS (*ptr_hdfsConnect)(const char* host, tPort port) = NULL;
+static int (*ptr_hdfsDisconnect)(hdfsFS fs) = NULL;
+
+static hdfsFile (*ptr_hdfsOpenFile)(hdfsFS fs, const char* path, int flags,
+ int bufferSize, short replication, tSize blocksize) = NULL; // NOLINT
+
+static int (*ptr_hdfsCloseFile)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsExists)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsSeek)(hdfsFS fs, hdfsFile file, tOffset desiredPos) = NULL;
+static tOffset (*ptr_hdfsTell)(hdfsFS fs, hdfsFile file) = NULL;
+static tSize (*ptr_hdfsRead)(hdfsFS fs, hdfsFile file, void* buffer, tSize length) = NULL;
+static tSize (*ptr_hdfsPread)(
+ hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) = NULL;
+static tSize (*ptr_hdfsWrite)(
+ hdfsFS fs, hdfsFile file, const void* buffer, tSize length) = NULL;
+static int (*ptr_hdfsFlush)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsAvailable)(hdfsFS fs, hdfsFile file) = NULL;
+static int (*ptr_hdfsCopy)(
+ hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
+static int (*ptr_hdfsMove)(
+ hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) = NULL;
+static int (*ptr_hdfsDelete)(hdfsFS fs, const char* path, int recursive) = NULL;
+static int (*ptr_hdfsRename)(hdfsFS fs, const char* oldPath, const char* newPath) = NULL;
+static char* (*ptr_hdfsGetWorkingDirectory)(
+ hdfsFS fs, char* buffer, size_t bufferSize) = NULL;
+static int (*ptr_hdfsSetWorkingDirectory)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsCreateDirectory)(hdfsFS fs, const char* path) = NULL;
+static int (*ptr_hdfsSetReplication)(
+ hdfsFS fs, const char* path, int16_t replication) = NULL;
+static hdfsFileInfo* (*ptr_hdfsListDirectory)(
+ hdfsFS fs, const char* path, int* numEntries) = NULL;
+static hdfsFileInfo* (*ptr_hdfsGetPathInfo)(hdfsFS fs, const char* path) = NULL;
+static void (*ptr_hdfsFreeFileInfo)(hdfsFileInfo* hdfsFileInfo, int numEntries) = NULL;
+static char*** (*ptr_hdfsGetHosts)(
+ hdfsFS fs, const char* path, tOffset start, tOffset length) = NULL;
+static void (*ptr_hdfsFreeHosts)(char*** blockHosts) = NULL;
+static tOffset (*ptr_hdfsGetDefaultBlockSize)(hdfsFS fs) = NULL;
+static tOffset (*ptr_hdfsGetCapacity)(hdfsFS fs) = NULL;
+static tOffset (*ptr_hdfsGetUsed)(hdfsFS fs) = NULL;
+static int (*ptr_hdfsChown)(
+ hdfsFS fs, const char* path, const char* owner, const char* group) = NULL;
+static int (*ptr_hdfsChmod)(hdfsFS fs, const char* path, short mode) = NULL; // NOLINT
+static int (*ptr_hdfsUtime)(hdfsFS fs, const char* path, tTime mtime, tTime atime) = NULL;
+
+// Helper functions for dlopens
+static std::vector<fs::path> get_potential_libjvm_paths();
+static std::vector<fs::path> get_potential_libhdfs_paths();
+static arrow::Status try_dlopen(std::vector<fs::path> potential_paths, const char* name,
+#ifndef _WIN32
+ void*& out_handle);
+#else
+ HINSTANCE& out_handle);
+#endif
+
+#define GET_SYMBOL(SYMBOL_NAME) \
+ if (!ptr_##SYMBOL_NAME) { \
+ *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \
+ }
+
+static void* get_symbol(const char* symbol) {
+ if (libhdfs_handle == NULL) return NULL;
+#ifndef _WIN32
+ return dlsym(libhdfs_handle, symbol);
+#else
+
+ void* ret = reinterpret_cast<void*>(GetProcAddress(libhdfs_handle, symbol));
+ if (ret == NULL) {
+ // logstream(LOG_INFO) << "GetProcAddress error: "
+ // << get_last_err_str(GetLastError()) << std::endl;
+ }
+ return ret;
+#endif
+}
+
+hdfsFS hdfsConnectAsUser(const char* host, tPort port, const char* user) {
+ return ptr_hdfsConnectAsUser(host, port, user);
+}
+
+// Returns NULL on failure
+hdfsFS hdfsConnect(const char* host, tPort port) {
+ if (ptr_hdfsConnect) {
+ return ptr_hdfsConnect(host, port);
+ } else {
+ // TODO: error reporting when shim setup fails
+ return NULL;
+ }
+}
+
+int hdfsDisconnect(hdfsFS fs) {
+ return ptr_hdfsDisconnect(fs);
+}
+
+hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags, int bufferSize,
+ short replication, tSize blocksize) { // NOLINT
+ return ptr_hdfsOpenFile(fs, path, flags, bufferSize, replication, blocksize);
+}
+
+int hdfsCloseFile(hdfsFS fs, hdfsFile file) {
+ return ptr_hdfsCloseFile(fs, file);
+}
+
+int hdfsExists(hdfsFS fs, const char* path) {
+ return ptr_hdfsExists(fs, path);
+}
+
+int hdfsSeek(hdfsFS fs, hdfsFile file, tOffset desiredPos) {
+ return ptr_hdfsSeek(fs, file, desiredPos);
+}
+
+tOffset hdfsTell(hdfsFS fs, hdfsFile file) {
+ return ptr_hdfsTell(fs, file);
+}
+
+tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length) {
+ return ptr_hdfsRead(fs, file, buffer, length);
+}
+
+tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length) {
+ return ptr_hdfsPread(fs, file, position, buffer, length);
+}
+
+tSize hdfsWrite(hdfsFS fs, hdfsFile file, const void* buffer, tSize length) {
+ return ptr_hdfsWrite(fs, file, buffer, length);
+}
+
+int hdfsFlush(hdfsFS fs, hdfsFile file) {
+ return ptr_hdfsFlush(fs, file);
+}
+
+int hdfsAvailable(hdfsFS fs, hdfsFile file) {
+ GET_SYMBOL(hdfsAvailable);
+ if (ptr_hdfsAvailable)
+ return ptr_hdfsAvailable(fs, file);
+ else
+ return 0;
+}
+
+int hdfsCopy(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
+ GET_SYMBOL(hdfsCopy);
+ if (ptr_hdfsCopy)
+ return ptr_hdfsCopy(srcFS, src, dstFS, dst);
+ else
+ return 0;
+}
+
+int hdfsMove(hdfsFS srcFS, const char* src, hdfsFS dstFS, const char* dst) {
+ GET_SYMBOL(hdfsMove);
+ if (ptr_hdfsMove)
+ return ptr_hdfsMove(srcFS, src, dstFS, dst);
+ else
+ return 0;
+}
+
+int hdfsDelete(hdfsFS fs, const char* path, int recursive) {
+ return ptr_hdfsDelete(fs, path, recursive);
+}
+
+int hdfsRename(hdfsFS fs, const char* oldPath, const char* newPath) {
+ GET_SYMBOL(hdfsRename);
+ if (ptr_hdfsRename)
+ return ptr_hdfsRename(fs, oldPath, newPath);
+ else
+ return 0;
+}
+
+char* hdfsGetWorkingDirectory(hdfsFS fs, char* buffer, size_t bufferSize) {
+ GET_SYMBOL(hdfsGetWorkingDirectory);
+ if (ptr_hdfsGetWorkingDirectory) {
+ return ptr_hdfsGetWorkingDirectory(fs, buffer, bufferSize);
+ } else {
+ return NULL;
+ }
+}
+
+int hdfsSetWorkingDirectory(hdfsFS fs, const char* path) {
+ GET_SYMBOL(hdfsSetWorkingDirectory);
+ if (ptr_hdfsSetWorkingDirectory) {
+ return ptr_hdfsSetWorkingDirectory(fs, path);
+ } else {
+ return 0;
+ }
+}
+
+int hdfsCreateDirectory(hdfsFS fs, const char* path) {
+ return ptr_hdfsCreateDirectory(fs, path);
+}
+
+int hdfsSetReplication(hdfsFS fs, const char* path, int16_t replication) {
+ GET_SYMBOL(hdfsSetReplication);
+ if (ptr_hdfsSetReplication) {
+ return ptr_hdfsSetReplication(fs, path, replication);
+ } else {
+ return 0;
+ }
+}
+
+hdfsFileInfo* hdfsListDirectory(hdfsFS fs, const char* path, int* numEntries) {
+ return ptr_hdfsListDirectory(fs, path, numEntries);
+}
+
+hdfsFileInfo* hdfsGetPathInfo(hdfsFS fs, const char* path) {
+ return ptr_hdfsGetPathInfo(fs, path);
+}
+
+void hdfsFreeFileInfo(hdfsFileInfo* hdfsFileInfo, int numEntries) {
+ ptr_hdfsFreeFileInfo(hdfsFileInfo, numEntries);
+}
+
+char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) {
+ GET_SYMBOL(hdfsGetHosts);
+ if (ptr_hdfsGetHosts) {
+ return ptr_hdfsGetHosts(fs, path, start, length);
+ } else {
+ return NULL;
+ }
+}
+
+void hdfsFreeHosts(char*** blockHosts) {
+ GET_SYMBOL(hdfsFreeHosts);
+ if (ptr_hdfsFreeHosts) { ptr_hdfsFreeHosts(blockHosts); }
+}
+
+tOffset hdfsGetDefaultBlockSize(hdfsFS fs) {
+ GET_SYMBOL(hdfsGetDefaultBlockSize);
+ if (ptr_hdfsGetDefaultBlockSize) {
+ return ptr_hdfsGetDefaultBlockSize(fs);
+ } else {
+ return 0;
+ }
+}
+
+tOffset hdfsGetCapacity(hdfsFS fs) {
+ return ptr_hdfsGetCapacity(fs);
+}
+
+tOffset hdfsGetUsed(hdfsFS fs) {
+ return ptr_hdfsGetUsed(fs);
+}
+
+int hdfsChown(hdfsFS fs, const char* path, const char* owner, const char* group) {
+ GET_SYMBOL(hdfsChown);
+ if (ptr_hdfsChown) {
+ return ptr_hdfsChown(fs, path, owner, group);
+ } else {
+ return 0;
+ }
+}
+
+int hdfsChmod(hdfsFS fs, const char* path, short mode) { // NOLINT
+ GET_SYMBOL(hdfsChmod);
+ if (ptr_hdfsChmod) {
+ return ptr_hdfsChmod(fs, path, mode);
+ } else {
+ return 0;
+ }
+}
+
+int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) {
+ GET_SYMBOL(hdfsUtime);
+ if (ptr_hdfsUtime) {
+ return ptr_hdfsUtime(fs, path, mtime, atime);
+ } else {
+ return 0;
+ }
+}
+
+static std::vector<fs::path> get_potential_libhdfs_paths() {
+ std::vector<fs::path> libhdfs_potential_paths = {
+ // find one in the local directory
+ fs::path("./libhdfs.so"), fs::path("./hdfs.dll"),
+ // find a global libhdfs.so
+ fs::path("libhdfs.so"), fs::path("hdfs.dll"),
+ };
+
+ const char* hadoop_home = std::getenv("HADOOP_HOME");
+ if (hadoop_home != nullptr) {
+ auto path = fs::path(hadoop_home) / "lib/native/libhdfs.so";
+ libhdfs_potential_paths.push_back(path);
+ }
+ return libhdfs_potential_paths;
+}
+
+static std::vector<fs::path> get_potential_libjvm_paths() {
+ std::vector<fs::path> libjvm_potential_paths;
+
+ std::vector<fs::path> search_prefixes;
+ std::vector<fs::path> search_suffixes;
+ std::string file_name;
+
+// From heuristics
+#ifdef __WIN32
+ search_prefixes = {""};
+ search_suffixes = {"/jre/bin/server", "/bin/server"};
+ file_name = "jvm.dll";
+#elif __APPLE__
+ search_prefixes = {""};
+ search_suffixes = {""};
+ file_name = "libjvm.dylib";
+
+// SFrame uses /usr/libexec/java_home to find JAVA_HOME; for now we are
+// expecting users to set an environment variable
+#else
+ search_prefixes = {
+ "/usr/lib/jvm/default-java", // ubuntu / debian distros
+ "/usr/lib/jvm/java", // rhel6
+ "/usr/lib/jvm", // centos6
+ "/usr/lib64/jvm", // opensuse 13
+ "/usr/local/lib/jvm/default-java", // alt ubuntu / debian distros
+ "/usr/local/lib/jvm/java", // alt rhel6
+ "/usr/local/lib/jvm", // alt centos6
+ "/usr/local/lib64/jvm", // alt opensuse 13
+ "/usr/local/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-7-openjdk-amd64", // alt ubuntu / debian distros
+ "/usr/local/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-6-openjdk-amd64", // alt ubuntu / debian distros
+ "/usr/lib/jvm/java-7-oracle", // alt ubuntu
+ "/usr/lib/jvm/java-8-oracle", // alt ubuntu
+ "/usr/lib/jvm/java-6-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-7-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-8-oracle", // alt ubuntu
+ "/usr/local/lib/jvm/java-6-oracle", // alt ubuntu
+ "/usr/lib/jvm/default", // alt centos
+ "/usr/java/latest", // alt centos
+ };
+ search_suffixes = {"/jre/lib/amd64/server"};
+ file_name = "libjvm.so";
+#endif
+ // From direct environment variable
+ char* env_value = NULL;
+ if ((env_value = getenv("JAVA_HOME")) != NULL) {
+ // logstream(LOG_INFO) << "Found environment variable " << env_name << ": " <<
+ // env_value << std::endl;
+ search_prefixes.insert(search_prefixes.begin(), env_value);
+ }
+
+ // Generate cross product between search_prefixes, search_suffixes, and file_name
+ for (auto& prefix : search_prefixes) {
+ for (auto& suffix : search_suffixes) {
+ auto path = (fs::path(prefix) / fs::path(suffix) / fs::path(file_name));
+ libjvm_potential_paths.push_back(path);
+ }
+ }
+
+ return libjvm_potential_paths;
+}
+
+#ifndef _WIN32
+static arrow::Status try_dlopen(
+ std::vector<fs::path> potential_paths, const char* name, void*& out_handle) {
+ std::vector<std::string> error_messages;
+
+ for (auto& i : potential_paths) {
+ i.make_preferred();
+ // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
+ out_handle = dlopen(i.native().c_str(), RTLD_NOW | RTLD_LOCAL);
+
+ if (out_handle != NULL) {
+ // logstream(LOG_INFO) << "Success!" << std::endl;
+ break;
+ } else {
+ const char* err_msg = dlerror();
+ if (err_msg != NULL) {
+ error_messages.push_back(std::string(err_msg));
+ } else {
+ error_messages.push_back(std::string(" returned NULL"));
+ }
+ }
+ }
+
+ if (out_handle == NULL) {
+ std::stringstream ss;
+ ss << "Unable to load " << name;
+ return arrow::Status::IOError(ss.str());
+ }
+
+ return arrow::Status::OK();
+}
+
+#else
+static arrow::Status try_dlopen(
+ std::vector<fs::path> potential_paths, const char* name, HINSTANCE& out_handle) {
+ std::vector<std::string> error_messages;
+
+ for (auto& i : potential_paths) {
+ i.make_preferred();
+ // logstream(LOG_INFO) << "Trying " << i.string().c_str() << std::endl;
+
+ out_handle = LoadLibrary(i.string().c_str());
+
+ if (out_handle != NULL) {
+ // logstream(LOG_INFO) << "Success!" << std::endl;
+ break;
+ } else {
+ // error_messages.push_back(get_last_err_str(GetLastError()));
+ }
+ }
+
+ if (out_handle == NULL) {
+ std::stringstream ss;
+ ss << "Unable to load " << name;
+ return arrow::Status::IOError(ss.str());
+ }
+
+ return arrow::Status::OK();
+}
+#endif // _WIN32
+
+} // extern "C"
+
+#define GET_SYMBOL_REQUIRED(SYMBOL_NAME) \
+ do { \
+ if (!ptr_##SYMBOL_NAME) { \
+ *reinterpret_cast<void**>(&ptr_##SYMBOL_NAME) = get_symbol("" #SYMBOL_NAME); \
+ } \
+ if (!ptr_##SYMBOL_NAME) \
+ return Status::IOError("Getting symbol " #SYMBOL_NAME "failed"); \
+ } while (0)
+
+namespace arrow {
+namespace io {
+
+Status ConnectLibHdfs() {
+ static std::mutex lock;
+ std::lock_guard<std::mutex> guard(lock);
+
+ static bool shim_attempted = false;
+ if (!shim_attempted) {
+ shim_attempted = true;
+
+ std::vector<fs::path> libjvm_potential_paths = get_potential_libjvm_paths();
+ RETURN_NOT_OK(try_dlopen(libjvm_potential_paths, "libjvm", libjvm_handle));
+
+ std::vector<fs::path> libhdfs_potential_paths = get_potential_libhdfs_paths();
+ RETURN_NOT_OK(try_dlopen(libhdfs_potential_paths, "libhdfs", libhdfs_handle));
+ } else if (libhdfs_handle == nullptr) {
+ return Status::IOError("Prior attempt to load libhdfs failed");
+ }
+
+ GET_SYMBOL_REQUIRED(hdfsConnect);
+ GET_SYMBOL_REQUIRED(hdfsConnectAsUser);
+ GET_SYMBOL_REQUIRED(hdfsCreateDirectory);
+ GET_SYMBOL_REQUIRED(hdfsDelete);
+ GET_SYMBOL_REQUIRED(hdfsDisconnect);
+ GET_SYMBOL_REQUIRED(hdfsExists);
+ GET_SYMBOL_REQUIRED(hdfsFreeFileInfo);
+ GET_SYMBOL_REQUIRED(hdfsGetCapacity);
+ GET_SYMBOL_REQUIRED(hdfsGetUsed);
+ GET_SYMBOL_REQUIRED(hdfsGetPathInfo);
+ GET_SYMBOL_REQUIRED(hdfsListDirectory);
+
+ // File methods
+ GET_SYMBOL_REQUIRED(hdfsCloseFile);
+ GET_SYMBOL_REQUIRED(hdfsFlush);
+ GET_SYMBOL_REQUIRED(hdfsOpenFile);
+ GET_SYMBOL_REQUIRED(hdfsRead);
+ GET_SYMBOL_REQUIRED(hdfsPread);
+ GET_SYMBOL_REQUIRED(hdfsSeek);
+ GET_SYMBOL_REQUIRED(hdfsTell);
+ GET_SYMBOL_REQUIRED(hdfsWrite);
+
+ return Status::OK();
+}
+
+} // namespace io
+} // namespace arrow
+
+#endif // HAS_HADOOP
http://git-wip-us.apache.org/repos/asf/arrow/blob/ef908302/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index db779d8..edcac88 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -126,8 +126,8 @@ class TestParquetIO : public ::testing::Test {
size_t chunk_size = values.size() / num_chunks;
for (int i = 0; i < num_chunks; i++) {
auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
- auto column_writer = static_cast<ParquetWriter<TestType>*>(
- row_group_writer->NextColumn());
+ auto column_writer =
+ static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
T* data = values.data() + i * chunk_size;
column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
column_writer->Close();