You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/09/18 20:02:08 UTC
[2/2] arrow git commit: ARROW-280: [C++] Refactor IPC / memory map IO
to use common arrow_io interfaces. Create arrow_ipc leaf library
ARROW-280: [C++] Refactor IPC / memory map IO to use common arrow_io interfaces. Create arrow_ipc leaf library
Several things here
* Clean up IO interface class structure to be able to indicate precise characteristics of an implementation
* Make the IPC reader/writer use more generic interfaces -- writing only needs an output stream, reading only needs a random access reader. This will unblock ARROW-267
* Create a separate arrow_ipc shared library
Author: Wes McKinney <we...@twosigma.com>
Closes #138 from wesm/ARROW-280 and squashes the following commits:
6a59eb6 [Wes McKinney] * Restructure IO interfaces to accommodate more configurations. * Refactor memory mapped IO interfaces to be in line with other arrow::io classes. * Split arrow_ipc into a leaf library * Refactor pyarrow and arrow_parquet to suit. Move BufferReader to arrow_io. Pyarrow parquet tests currently segfault
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/559b8652
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/559b8652
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/559b8652
Branch: refs/heads/master
Commit: 559b865226ec0f5d78e87957c2ff0f7711bec9a8
Parents: 17e90e1
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun Sep 18 16:01:58 2016 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Sep 18 16:01:58 2016 -0400
----------------------------------------------------------------------
cpp/CMakeLists.txt | 6 -
cpp/src/arrow/io/CMakeLists.txt | 11 +-
cpp/src/arrow/io/hdfs-io-test.cc | 315 ----------------------
cpp/src/arrow/io/hdfs.cc | 35 ++-
cpp/src/arrow/io/hdfs.h | 29 +-
cpp/src/arrow/io/interfaces.h | 71 ++++-
cpp/src/arrow/io/io-hdfs-test.cc | 315 ++++++++++++++++++++++
cpp/src/arrow/io/io-memory-test.cc | 125 +++++++++
cpp/src/arrow/io/libhdfs_shim.cc | 3 +-
cpp/src/arrow/io/memory.cc | 262 ++++++++++++++++++
cpp/src/arrow/io/memory.h | 130 +++++++++
cpp/src/arrow/io/test-common.h | 63 +++++
cpp/src/arrow/ipc/CMakeLists.txt | 58 +++-
cpp/src/arrow/ipc/adapter.cc | 61 +++--
cpp/src/arrow/ipc/adapter.h | 39 +--
cpp/src/arrow/ipc/ipc-adapter-test.cc | 33 +--
cpp/src/arrow/ipc/ipc-memory-test.cc | 127 ---------
cpp/src/arrow/ipc/memory.cc | 182 -------------
cpp/src/arrow/ipc/memory.h | 150 -----------
cpp/src/arrow/ipc/metadata-internal.cc | 9 +-
cpp/src/arrow/ipc/metadata-internal.h | 2 +-
cpp/src/arrow/ipc/metadata.h | 11 +-
cpp/src/arrow/ipc/symbols.map | 18 ++
cpp/src/arrow/ipc/test-common.h | 25 --
cpp/src/arrow/ipc/util.h | 56 ++++
cpp/src/arrow/parquet/CMakeLists.txt | 1 +
cpp/src/arrow/parquet/io.cc | 4 +-
cpp/src/arrow/parquet/io.h | 4 +-
cpp/src/arrow/parquet/parquet-io-test.cc | 51 +---
cpp/src/arrow/parquet/parquet-schema-test.cc | 3 +-
cpp/src/arrow/parquet/reader.cc | 8 +-
cpp/src/arrow/parquet/reader.h | 2 +-
cpp/src/arrow/parquet/schema.cc | 2 +-
cpp/src/arrow/parquet/writer.cc | 2 +-
cpp/src/arrow/type.h | 4 +-
cpp/src/arrow/util/memory-pool-test.cc | 2 +-
python/pyarrow/includes/libarrow_io.pxd | 42 ++-
python/pyarrow/includes/parquet.pxd | 18 +-
python/pyarrow/io.pxd | 7 +-
python/pyarrow/io.pyx | 14 +-
python/pyarrow/parquet.pyx | 6 +-
41 files changed, 1288 insertions(+), 1018 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index a39a752..be95dab 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -626,12 +626,6 @@ set(ARROW_SRCS
src/arrow/table.cc
src/arrow/type.cc
- # IPC / Shared memory library; to be turned into an optional component
- src/arrow/ipc/adapter.cc
- src/arrow/ipc/memory.cc
- src/arrow/ipc/metadata.cc
- src/arrow/ipc/metadata-internal.cc
-
src/arrow/types/construct.cc
src/arrow/types/decimal.cc
src/arrow/types/json.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index b8c0e13..87e227e 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -20,6 +20,7 @@
set(ARROW_IO_LINK_LIBS
arrow_shared
+ dl
)
if (ARROW_BOOST_USE_SHARED)
@@ -37,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS
${ARROW_IO_PRIVATE_LINK_LIBS})
set(ARROW_IO_SRCS
+ memory.cc
)
if(ARROW_HDFS)
@@ -71,8 +73,8 @@ if(ARROW_HDFS)
${ARROW_HDFS_SRCS}
${ARROW_IO_SRCS})
- ADD_ARROW_TEST(hdfs-io-test)
- ARROW_TEST_LINK_LIBRARIES(hdfs-io-test
+ ADD_ARROW_TEST(io-hdfs-test)
+ ARROW_TEST_LINK_LIBRARIES(io-hdfs-test
${ARROW_IO_TEST_LINK_LIBS})
endif()
@@ -101,10 +103,15 @@ if (APPLE)
INSTALL_NAME_DIR "@rpath")
endif()
+ADD_ARROW_TEST(io-memory-test)
+ARROW_TEST_LINK_LIBRARIES(io-memory-test
+ ${ARROW_IO_TEST_LINK_LIBS})
+
# Headers: top level
install(FILES
hdfs.h
interfaces.h
+ memory.h
DESTINATION include/arrow/io)
install(TARGETS arrow_io
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs-io-test.cc b/cpp/src/arrow/io/hdfs-io-test.cc
deleted file mode 100644
index e48a281..0000000
--- a/cpp/src/arrow/io/hdfs-io-test.cc
+++ /dev/null
@@ -1,315 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
-#include <string>
-
-#include "gtest/gtest.h"
-
-#include <boost/filesystem.hpp> // NOLINT
-
-#include "arrow/io/hdfs.h"
-#include "arrow/test-util.h"
-#include "arrow/util/status.h"
-
-namespace arrow {
-namespace io {
-
-std::vector<uint8_t> RandomData(int64_t size) {
- std::vector<uint8_t> buffer(size);
- test::random_bytes(size, 0, buffer.data());
- return buffer;
-}
-
-class TestHdfsClient : public ::testing::Test {
- public:
- Status MakeScratchDir() {
- if (client_->Exists(scratch_dir_)) {
- RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
- }
- return client_->CreateDirectory(scratch_dir_);
- }
-
- Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
- bool append = false, int buffer_size = 0, int replication = 0,
- int default_block_size = 0) {
- std::shared_ptr<HdfsWriteableFile> file;
- RETURN_NOT_OK(client_->OpenWriteable(
- path, append, buffer_size, replication, default_block_size, &file));
-
- RETURN_NOT_OK(file->Write(buffer, size));
- RETURN_NOT_OK(file->Close());
-
- return Status::OK();
- }
-
- std::string ScratchPath(const std::string& name) {
- std::stringstream ss;
- ss << scratch_dir_ << "/" << name;
- return ss.str();
- }
-
- std::string HdfsAbsPath(const std::string& relpath) {
- std::stringstream ss;
- ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
- return ss.str();
- }
-
- protected:
- // Set up shared state between unit tests
- static void SetUpTestCase() {
- if (!ConnectLibHdfs().ok()) {
- std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
- return;
- }
-
- loaded_libhdfs_ = true;
-
- const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
- const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
- const char* user = std::getenv("ARROW_HDFS_TEST_USER");
-
- ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
-
- conf_.host = host == nullptr ? "localhost" : host;
- conf_.user = user;
- conf_.port = port == nullptr ? 20500 : atoi(port);
-
- ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
- }
-
- static void TearDownTestCase() {
- if (client_) {
- EXPECT_OK(client_->Delete(scratch_dir_, true));
- EXPECT_OK(client_->Disconnect());
- }
- }
-
- static bool loaded_libhdfs_;
-
- // Resources shared amongst unit tests
- static HdfsConnectionConfig conf_;
- static std::string scratch_dir_;
- static std::shared_ptr<HdfsClient> client_;
-};
-
-bool TestHdfsClient::loaded_libhdfs_ = false;
-HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
-
-std::string TestHdfsClient::scratch_dir_ =
- boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
-
-std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
-
-#define SKIP_IF_NO_LIBHDFS() \
- if (!loaded_libhdfs_) { \
- std::cout << "No libhdfs, skipping" << std::endl; \
- return; \
- }
-
-TEST_F(TestHdfsClient, ConnectsAgain) {
- SKIP_IF_NO_LIBHDFS();
-
- std::shared_ptr<HdfsClient> client;
- ASSERT_OK(HdfsClient::Connect(&conf_, &client));
- ASSERT_OK(client->Disconnect());
-}
-
-TEST_F(TestHdfsClient, CreateDirectory) {
- SKIP_IF_NO_LIBHDFS();
-
- std::string path = ScratchPath("create-directory");
-
- if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
-
- ASSERT_OK(client_->CreateDirectory(path));
- ASSERT_TRUE(client_->Exists(path));
- EXPECT_OK(client_->Delete(path, true));
- ASSERT_FALSE(client_->Exists(path));
-}
-
-TEST_F(TestHdfsClient, GetCapacityUsed) {
- SKIP_IF_NO_LIBHDFS();
-
- // Who knows what is actually in your DFS cluster, but expect it to have
- // positive used bytes and capacity
- int64_t nbytes = 0;
- ASSERT_OK(client_->GetCapacity(&nbytes));
- ASSERT_LT(0, nbytes);
-
- ASSERT_OK(client_->GetUsed(&nbytes));
- ASSERT_LT(0, nbytes);
-}
-
-TEST_F(TestHdfsClient, GetPathInfo) {
- SKIP_IF_NO_LIBHDFS();
-
- HdfsPathInfo info;
-
- ASSERT_OK(MakeScratchDir());
-
- // Directory info
- ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
- ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
- ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
- ASSERT_EQ(conf_.user, info.owner);
-
- // TODO(wesm): test group, other attrs
-
- auto path = ScratchPath("test-file");
-
- const int size = 100;
-
- std::vector<uint8_t> buffer = RandomData(size);
-
- ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
- ASSERT_OK(client_->GetPathInfo(path, &info));
-
- ASSERT_EQ(ObjectType::FILE, info.kind);
- ASSERT_EQ(HdfsAbsPath(path), info.name);
- ASSERT_EQ(conf_.user, info.owner);
- ASSERT_EQ(size, info.size);
-}
-
-TEST_F(TestHdfsClient, AppendToFile) {
- SKIP_IF_NO_LIBHDFS();
-
- ASSERT_OK(MakeScratchDir());
-
- auto path = ScratchPath("test-file");
- const int size = 100;
-
- std::vector<uint8_t> buffer = RandomData(size);
- ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
-
- // now append
- ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
-
- HdfsPathInfo info;
- ASSERT_OK(client_->GetPathInfo(path, &info));
- ASSERT_EQ(size * 2, info.size);
-}
-
-TEST_F(TestHdfsClient, ListDirectory) {
- SKIP_IF_NO_LIBHDFS();
-
- const int size = 100;
- std::vector<uint8_t> data = RandomData(size);
-
- auto p1 = ScratchPath("test-file-1");
- auto p2 = ScratchPath("test-file-2");
- auto d1 = ScratchPath("test-dir-1");
-
- ASSERT_OK(MakeScratchDir());
- ASSERT_OK(WriteDummyFile(p1, data.data(), size));
- ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
- ASSERT_OK(client_->CreateDirectory(d1));
-
- std::vector<HdfsPathInfo> listing;
- ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
-
- // Do it again, appends!
- ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
-
- ASSERT_EQ(6, static_cast<int>(listing.size()));
-
- // Argh, well, shouldn't expect the listing to be in any particular order
- for (size_t i = 0; i < listing.size(); ++i) {
- const HdfsPathInfo& info = listing[i];
- if (info.name == HdfsAbsPath(p1)) {
- ASSERT_EQ(ObjectType::FILE, info.kind);
- ASSERT_EQ(size, info.size);
- } else if (info.name == HdfsAbsPath(p2)) {
- ASSERT_EQ(ObjectType::FILE, info.kind);
- ASSERT_EQ(size / 2, info.size);
- } else if (info.name == HdfsAbsPath(d1)) {
- ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
- } else {
- FAIL() << "Unexpected path: " << info.name;
- }
- }
-}
-
-TEST_F(TestHdfsClient, ReadableMethods) {
- SKIP_IF_NO_LIBHDFS();
-
- ASSERT_OK(MakeScratchDir());
-
- auto path = ScratchPath("test-file");
- const int size = 100;
-
- std::vector<uint8_t> data = RandomData(size);
- ASSERT_OK(WriteDummyFile(path, data.data(), size));
-
- std::shared_ptr<HdfsReadableFile> file;
- ASSERT_OK(client_->OpenReadable(path, &file));
-
- // Test GetSize -- move this into its own unit test if ever needed
- int64_t file_size;
- ASSERT_OK(file->GetSize(&file_size));
- ASSERT_EQ(size, file_size);
-
- uint8_t buffer[50];
- int64_t bytes_read = 0;
-
- ASSERT_OK(file->Read(50, &bytes_read, buffer));
- ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
- ASSERT_EQ(50, bytes_read);
-
- ASSERT_OK(file->Read(50, &bytes_read, buffer));
- ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
- ASSERT_EQ(50, bytes_read);
-
- // EOF
- ASSERT_OK(file->Read(1, &bytes_read, buffer));
- ASSERT_EQ(0, bytes_read);
-
- // ReadAt to EOF
- ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
- ASSERT_EQ(40, bytes_read);
- ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
-
- // Seek, Tell
- ASSERT_OK(file->Seek(60));
-
- int64_t position;
- ASSERT_OK(file->Tell(&position));
- ASSERT_EQ(60, position);
-}
-
-TEST_F(TestHdfsClient, RenameFile) {
- SKIP_IF_NO_LIBHDFS();
-
- ASSERT_OK(MakeScratchDir());
-
- auto src_path = ScratchPath("src-file");
- auto dst_path = ScratchPath("dst-file");
- const int size = 100;
-
- std::vector<uint8_t> data = RandomData(size);
- ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
-
- ASSERT_OK(client_->Rename(src_path, dst_path));
-
- ASSERT_FALSE(client_->Exists(src_path));
- ASSERT_TRUE(client_->Exists(dst_path));
-}
-
-} // namespace io
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 800c3ed..a6b4b2f 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -142,6 +142,15 @@ Status HdfsReadableFile::ReadAt(
return impl_->ReadAt(position, nbytes, bytes_read, buffer);
}
+Status HdfsReadableFile::ReadAt(
+ int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ return Status::NotImplemented("Not yet implemented");
+}
+
+bool HdfsReadableFile::supports_zero_copy() const {
+ return false;
+}
+
Status HdfsReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
return impl_->Read(nbytes, bytes_read, buffer);
}
@@ -162,9 +171,9 @@ Status HdfsReadableFile::Tell(int64_t* position) {
// File writing
// Private implementation for writeable-only files
-class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
+class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
public:
- HdfsWriteableFileImpl() {}
+ HdfsOutputStreamImpl() {}
Status Close() {
if (is_open_) {
@@ -185,29 +194,29 @@ class HdfsWriteableFile::HdfsWriteableFileImpl : public HdfsAnyFileImpl {
}
};
-HdfsWriteableFile::HdfsWriteableFile() {
- impl_.reset(new HdfsWriteableFileImpl());
+HdfsOutputStream::HdfsOutputStream() {
+ impl_.reset(new HdfsOutputStreamImpl());
}
-HdfsWriteableFile::~HdfsWriteableFile() {
+HdfsOutputStream::~HdfsOutputStream() {
impl_->Close();
}
-Status HdfsWriteableFile::Close() {
+Status HdfsOutputStream::Close() {
return impl_->Close();
}
-Status HdfsWriteableFile::Write(
+Status HdfsOutputStream::Write(
const uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) {
return impl_->Write(buffer, nbytes, bytes_read);
}
-Status HdfsWriteableFile::Write(const uint8_t* buffer, int64_t nbytes) {
+Status HdfsOutputStream::Write(const uint8_t* buffer, int64_t nbytes) {
int64_t bytes_written_dummy = 0;
return Write(buffer, nbytes, &bytes_written_dummy);
}
-Status HdfsWriteableFile::Tell(int64_t* position) {
+Status HdfsOutputStream::Tell(int64_t* position) {
return impl_->Tell(position);
}
@@ -347,7 +356,7 @@ class HdfsClient::HdfsClientImpl {
Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
- std::shared_ptr<HdfsWriteableFile>* file) {
+ std::shared_ptr<HdfsOutputStream>* file) {
int flags = O_WRONLY;
if (append) flags |= O_APPEND;
@@ -362,7 +371,7 @@ class HdfsClient::HdfsClientImpl {
}
// std::make_shared does not work with private ctors
- *file = std::shared_ptr<HdfsWriteableFile>(new HdfsWriteableFile());
+ *file = std::shared_ptr<HdfsOutputStream>(new HdfsOutputStream());
(*file)->impl_->set_members(path, fs_, handle);
return Status::OK();
@@ -440,13 +449,13 @@ Status HdfsClient::OpenReadable(
Status HdfsClient::OpenWriteable(const std::string& path, bool append,
int32_t buffer_size, int16_t replication, int64_t default_block_size,
- std::shared_ptr<HdfsWriteableFile>* file) {
+ std::shared_ptr<HdfsOutputStream>* file) {
return impl_->OpenWriteable(
path, append, buffer_size, replication, default_block_size, file);
}
Status HdfsClient::OpenWriteable(
- const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file) {
+ const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file) {
return OpenWriteable(path, append, 0, 0, 0, file);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index b6449fc..39720cc 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -29,13 +29,14 @@
namespace arrow {
+class Buffer;
class Status;
namespace io {
class HdfsClient;
class HdfsReadableFile;
-class HdfsWriteableFile;
+class HdfsOutputStream;
struct HdfsPathInfo {
ObjectType::type kind;
@@ -139,14 +140,14 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
// @param default_block_size, 0 for default
Status OpenWriteable(const std::string& path, bool append, int32_t buffer_size,
int16_t replication, int64_t default_block_size,
- std::shared_ptr<HdfsWriteableFile>* file);
+ std::shared_ptr<HdfsOutputStream>* file);
Status OpenWriteable(
- const std::string& path, bool append, std::shared_ptr<HdfsWriteableFile>* file);
+ const std::string& path, bool append, std::shared_ptr<HdfsOutputStream>* file);
private:
friend class HdfsReadableFile;
- friend class HdfsWriteableFile;
+ friend class HdfsOutputStream;
class ARROW_NO_EXPORT HdfsClientImpl;
std::unique_ptr<HdfsClientImpl> impl_;
@@ -155,7 +156,7 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
DISALLOW_COPY_AND_ASSIGN(HdfsClient);
};
-class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
+class ARROW_EXPORT HdfsReadableFile : public ReadableFileInterface {
public:
~HdfsReadableFile();
@@ -166,6 +167,10 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
Status ReadAt(
int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+ Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ bool supports_zero_copy() const override;
+
Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;
@@ -183,9 +188,11 @@ class ARROW_EXPORT HdfsReadableFile : public RandomAccessFile {
DISALLOW_COPY_AND_ASSIGN(HdfsReadableFile);
};
-class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
+// Naming this file OutputStream because it does not support seeking (like the
+// WriteableFile interface)
+class ARROW_EXPORT HdfsOutputStream : public OutputStream {
public:
- ~HdfsWriteableFile();
+ ~HdfsOutputStream();
Status Close() override;
@@ -196,14 +203,14 @@ class ARROW_EXPORT HdfsWriteableFile : public WriteableFile {
Status Tell(int64_t* position) override;
private:
- class ARROW_NO_EXPORT HdfsWriteableFileImpl;
- std::unique_ptr<HdfsWriteableFileImpl> impl_;
+ class ARROW_NO_EXPORT HdfsOutputStreamImpl;
+ std::unique_ptr<HdfsOutputStreamImpl> impl_;
friend class HdfsClient::HdfsClientImpl;
- HdfsWriteableFile();
+ HdfsOutputStream();
- DISALLOW_COPY_AND_ASSIGN(HdfsWriteableFile);
+ DISALLOW_COPY_AND_ASSIGN(HdfsOutputStream);
};
Status ARROW_EXPORT ConnectLibHdfs();
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index c212852..fa34b43 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -21,8 +21,11 @@
#include <cstdint>
#include <memory>
+#include "arrow/util/macros.h"
+
namespace arrow {
+class Buffer;
class Status;
namespace io {
@@ -40,30 +43,78 @@ class FileSystemClient {
virtual ~FileSystemClient() {}
};
-class FileBase {
+class FileInterface {
public:
+ virtual ~FileInterface() {}
virtual Status Close() = 0;
virtual Status Tell(int64_t* position) = 0;
+
+ FileMode::type mode() const { return mode_; }
+
+ protected:
+ FileInterface() {}
+ FileMode::type mode_;
+
+ void set_mode(FileMode::type mode) { mode_ = mode; }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(FileInterface);
};
-class ReadableFile : public FileBase {
+class Seekable {
public:
- virtual Status ReadAt(
- int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
+ virtual Status Seek(int64_t position) = 0;
+};
- virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) = 0;
+class Writeable {
+ public:
+ virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;
+};
- virtual Status GetSize(int64_t* size) = 0;
+class Readable {
+ public:
+ virtual Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
+};
+
+class OutputStream : public FileInterface, public Writeable {
+ protected:
+ OutputStream() {}
};
-class RandomAccessFile : public ReadableFile {
+class InputStream : public FileInterface, public Readable {
+ protected:
+ InputStream() {}
+};
+
+class ReadableFileInterface : public InputStream, public Seekable {
public:
- virtual Status Seek(int64_t position) = 0;
+ virtual Status ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) = 0;
+
+ virtual Status GetSize(int64_t* size) = 0;
+
+ // Does not copy if not necessary
+ virtual Status ReadAt(
+ int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
+
+ virtual bool supports_zero_copy() const = 0;
+
+ protected:
+ ReadableFileInterface() { set_mode(FileMode::READ); }
};
-class WriteableFile : public FileBase {
+class WriteableFileInterface : public OutputStream, public Seekable {
public:
- virtual Status Write(const uint8_t* buffer, int64_t nbytes) = 0;
+ virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0;
+
+ protected:
+ WriteableFileInterface() { set_mode(FileMode::READ); }
+};
+
+class ReadWriteFileInterface : public ReadableFileInterface,
+ public WriteableFileInterface {
+ protected:
+ ReadWriteFileInterface() { ReadableFileInterface::set_mode(FileMode::READWRITE); }
};
} // namespace io
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
new file mode 100644
index 0000000..7901932
--- /dev/null
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -0,0 +1,315 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include <boost/filesystem.hpp> // NOLINT
+
+#include "arrow/io/hdfs.h"
+#include "arrow/test-util.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+std::vector<uint8_t> RandomData(int64_t size) {
+ std::vector<uint8_t> buffer(size);
+ test::random_bytes(size, 0, buffer.data());
+ return buffer;
+}
+
+class TestHdfsClient : public ::testing::Test {
+ public:
+ Status MakeScratchDir() {
+ if (client_->Exists(scratch_dir_)) {
+ RETURN_NOT_OK((client_->Delete(scratch_dir_, true)));
+ }
+ return client_->CreateDirectory(scratch_dir_);
+ }
+
+ Status WriteDummyFile(const std::string& path, const uint8_t* buffer, int64_t size,
+ bool append = false, int buffer_size = 0, int replication = 0,
+ int default_block_size = 0) {
+ std::shared_ptr<HdfsOutputStream> file;
+ RETURN_NOT_OK(client_->OpenWriteable(
+ path, append, buffer_size, replication, default_block_size, &file));
+
+ RETURN_NOT_OK(file->Write(buffer, size));
+ RETURN_NOT_OK(file->Close());
+
+ return Status::OK();
+ }
+
+ std::string ScratchPath(const std::string& name) {
+ std::stringstream ss;
+ ss << scratch_dir_ << "/" << name;
+ return ss.str();
+ }
+
+ std::string HdfsAbsPath(const std::string& relpath) {
+ std::stringstream ss;
+ ss << "hdfs://" << conf_.host << ":" << conf_.port << relpath;
+ return ss.str();
+ }
+
+ protected:
+ // Set up shared state between unit tests
+ static void SetUpTestCase() {
+ if (!ConnectLibHdfs().ok()) {
+ std::cout << "Loading libhdfs failed, skipping tests gracefully" << std::endl;
+ return;
+ }
+
+ loaded_libhdfs_ = true;
+
+ const char* host = std::getenv("ARROW_HDFS_TEST_HOST");
+ const char* port = std::getenv("ARROW_HDFS_TEST_PORT");
+ const char* user = std::getenv("ARROW_HDFS_TEST_USER");
+
+ ASSERT_TRUE(user) << "Set ARROW_HDFS_TEST_USER";
+
+ conf_.host = host == nullptr ? "localhost" : host;
+ conf_.user = user;
+ conf_.port = port == nullptr ? 20500 : atoi(port);
+
+ ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
+ }
+
+ static void TearDownTestCase() {
+ if (client_) {
+ EXPECT_OK(client_->Delete(scratch_dir_, true));
+ EXPECT_OK(client_->Disconnect());
+ }
+ }
+
+ static bool loaded_libhdfs_;
+
+ // Resources shared amongst unit tests
+ static HdfsConnectionConfig conf_;
+ static std::string scratch_dir_;
+ static std::shared_ptr<HdfsClient> client_;
+};
+
+bool TestHdfsClient::loaded_libhdfs_ = false;
+HdfsConnectionConfig TestHdfsClient::conf_ = HdfsConnectionConfig();
+
+std::string TestHdfsClient::scratch_dir_ =
+ boost::filesystem::unique_path("/tmp/arrow-hdfs/scratch-%%%%").native();
+
+std::shared_ptr<HdfsClient> TestHdfsClient::client_ = nullptr;
+
+#define SKIP_IF_NO_LIBHDFS() \
+ if (!loaded_libhdfs_) { \
+ std::cout << "No libhdfs, skipping" << std::endl; \
+ return; \
+ }
+
+TEST_F(TestHdfsClient, ConnectsAgain) {
+ SKIP_IF_NO_LIBHDFS();
+
+ std::shared_ptr<HdfsClient> client;
+ ASSERT_OK(HdfsClient::Connect(&conf_, &client));
+ ASSERT_OK(client->Disconnect());
+}
+
+TEST_F(TestHdfsClient, CreateDirectory) {
+ SKIP_IF_NO_LIBHDFS();
+
+ std::string path = ScratchPath("create-directory");
+
+ if (client_->Exists(path)) { ASSERT_OK(client_->Delete(path, true)); }
+
+ ASSERT_OK(client_->CreateDirectory(path));
+ ASSERT_TRUE(client_->Exists(path));
+ EXPECT_OK(client_->Delete(path, true));
+ ASSERT_FALSE(client_->Exists(path));
+}
+
+TEST_F(TestHdfsClient, GetCapacityUsed) {
+ SKIP_IF_NO_LIBHDFS();
+
+ // Who knows what is actually in your DFS cluster, but expect it to have
+ // positive used bytes and capacity
+ int64_t nbytes = 0;
+ ASSERT_OK(client_->GetCapacity(&nbytes));
+ ASSERT_LT(0, nbytes);
+
+ ASSERT_OK(client_->GetUsed(&nbytes));
+ ASSERT_LT(0, nbytes);
+}
+
+TEST_F(TestHdfsClient, GetPathInfo) {
+ SKIP_IF_NO_LIBHDFS();
+
+ HdfsPathInfo info;
+
+ ASSERT_OK(MakeScratchDir());
+
+ // Directory info
+ ASSERT_OK(client_->GetPathInfo(scratch_dir_, &info));
+ ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+ ASSERT_EQ(HdfsAbsPath(scratch_dir_), info.name);
+ ASSERT_EQ(conf_.user, info.owner);
+
+ // TODO(wesm): test group, other attrs
+
+ auto path = ScratchPath("test-file");
+
+ const int size = 100;
+
+ std::vector<uint8_t> buffer = RandomData(size);
+
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+ ASSERT_OK(client_->GetPathInfo(path, &info));
+
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(HdfsAbsPath(path), info.name);
+ ASSERT_EQ(conf_.user, info.owner);
+ ASSERT_EQ(size, info.size);
+}
+
+TEST_F(TestHdfsClient, AppendToFile) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto path = ScratchPath("test-file");
+ const int size = 100;
+
+ std::vector<uint8_t> buffer = RandomData(size);
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size));
+
+ // now append
+ ASSERT_OK(WriteDummyFile(path, buffer.data(), size, true));
+
+ HdfsPathInfo info;
+ ASSERT_OK(client_->GetPathInfo(path, &info));
+ ASSERT_EQ(size * 2, info.size);
+}
+
+TEST_F(TestHdfsClient, ListDirectory) {
+ SKIP_IF_NO_LIBHDFS();
+
+ const int size = 100;
+ std::vector<uint8_t> data = RandomData(size);
+
+ auto p1 = ScratchPath("test-file-1");
+ auto p2 = ScratchPath("test-file-2");
+ auto d1 = ScratchPath("test-dir-1");
+
+ ASSERT_OK(MakeScratchDir());
+ ASSERT_OK(WriteDummyFile(p1, data.data(), size));
+ ASSERT_OK(WriteDummyFile(p2, data.data(), size / 2));
+ ASSERT_OK(client_->CreateDirectory(d1));
+
+ std::vector<HdfsPathInfo> listing;
+ ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+ // Do it again, appends!
+ ASSERT_OK(client_->ListDirectory(scratch_dir_, &listing));
+
+ ASSERT_EQ(6, static_cast<int>(listing.size()));
+
+ // Argh, well, shouldn't expect the listing to be in any particular order
+ for (size_t i = 0; i < listing.size(); ++i) {
+ const HdfsPathInfo& info = listing[i];
+ if (info.name == HdfsAbsPath(p1)) {
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(size, info.size);
+ } else if (info.name == HdfsAbsPath(p2)) {
+ ASSERT_EQ(ObjectType::FILE, info.kind);
+ ASSERT_EQ(size / 2, info.size);
+ } else if (info.name == HdfsAbsPath(d1)) {
+ ASSERT_EQ(ObjectType::DIRECTORY, info.kind);
+ } else {
+ FAIL() << "Unexpected path: " << info.name;
+ }
+ }
+}
+
+TEST_F(TestHdfsClient, ReadableMethods) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto path = ScratchPath("test-file");
+ const int size = 100;
+
+ std::vector<uint8_t> data = RandomData(size);
+ ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+ std::shared_ptr<HdfsReadableFile> file;
+ ASSERT_OK(client_->OpenReadable(path, &file));
+
+ // Test GetSize -- move this into its own unit test if ever needed
+ int64_t file_size;
+ ASSERT_OK(file->GetSize(&file_size));
+ ASSERT_EQ(size, file_size);
+
+ uint8_t buffer[50];
+ int64_t bytes_read = 0;
+
+ ASSERT_OK(file->Read(50, &bytes_read, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, data.data(), 50));
+ ASSERT_EQ(50, bytes_read);
+
+ ASSERT_OK(file->Read(50, &bytes_read, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, data.data() + 50, 50));
+ ASSERT_EQ(50, bytes_read);
+
+ // EOF
+ ASSERT_OK(file->Read(1, &bytes_read, buffer));
+ ASSERT_EQ(0, bytes_read);
+
+ // ReadAt to EOF
+ ASSERT_OK(file->ReadAt(60, 100, &bytes_read, buffer));
+ ASSERT_EQ(40, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buffer, data.data() + 60, bytes_read));
+
+ // Seek, Tell
+ ASSERT_OK(file->Seek(60));
+
+ int64_t position;
+ ASSERT_OK(file->Tell(&position));
+ ASSERT_EQ(60, position);
+}
+
+TEST_F(TestHdfsClient, RenameFile) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto src_path = ScratchPath("src-file");
+ auto dst_path = ScratchPath("dst-file");
+ const int size = 100;
+
+ std::vector<uint8_t> data = RandomData(size);
+ ASSERT_OK(WriteDummyFile(src_path, data.data(), size));
+
+ ASSERT_OK(client_->Rename(src_path, dst_path));
+
+ ASSERT_FALSE(client_->Exists(src_path));
+ ASSERT_TRUE(client_->Exists(dst_path));
+}
+
+} // namespace io
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
new file mode 100644
index 0000000..6de35da
--- /dev/null
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
+
+namespace arrow {
+namespace io {
+
+class TestMemoryMappedFile : public ::testing::Test, public MemoryMapFixture {
+ public:
+ void TearDown() { MemoryMapFixture::TearDown(); }
+};
+
+TEST_F(TestMemoryMappedFile, InvalidUsages) {}
+
+TEST_F(TestMemoryMappedFile, WriteRead) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ const int reps = 5;
+
+ std::string path = "ipc-write-read-test";
+ CreateFile(path, reps * buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> result;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result));
+
+ int64_t position = 0;
+ std::shared_ptr<Buffer> out_buffer;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(result->Write(buffer.data(), buffer_size));
+ ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
+
+ ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+
+ position += buffer_size;
+ }
+}
+
+TEST_F(TestMemoryMappedFile, ReadOnly) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ const int reps = 5;
+
+ std::string path = "ipc-read-only-test";
+ CreateFile(path, reps * buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> rwmmap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+
+ int64_t position = 0;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(rwmmap->Write(buffer.data(), buffer_size));
+ position += buffer_size;
+ }
+ rwmmap->Close();
+
+ std::shared_ptr<MemoryMappedFile> rommap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+ position = 0;
+ std::shared_ptr<Buffer> out_buffer;
+ for (int i = 0; i < reps; ++i) {
+ ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
+
+ ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
+ position += buffer_size;
+ }
+ rommap->Close();
+}
+
+TEST_F(TestMemoryMappedFile, InvalidMode) {
+ const int64_t buffer_size = 1024;
+ std::vector<uint8_t> buffer(buffer_size);
+
+ test::random_bytes(1024, 0, buffer.data());
+
+ std::string path = "ipc-invalid-mode-test";
+ CreateFile(path, buffer_size);
+
+ std::shared_ptr<MemoryMappedFile> rommap;
+ ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READ, &rommap));
+
+ ASSERT_RAISES(IOError, rommap->Write(buffer.data(), buffer_size));
+}
+
+TEST_F(TestMemoryMappedFile, InvalidFile) {
+ std::string non_existent_path = "invalid-file-name-asfd";
+
+ std::shared_ptr<MemoryMappedFile> result;
+ ASSERT_RAISES(
+ IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
+}
+
+} // namespace io
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/libhdfs_shim.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc
index 003570d..0b805ab 100644
--- a/cpp/src/arrow/io/libhdfs_shim.cc
+++ b/cpp/src/arrow/io/libhdfs_shim.cc
@@ -51,8 +51,7 @@ extern "C" {
#include <type_traits>
#include <vector>
-#include <boost/filesystem.hpp> // NOLINT
-#include <boost/algorithm/string.hpp> // NOLINT
+#include <boost/filesystem.hpp> // NOLINT
#include "arrow/util/status.h"
#include "arrow/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
new file mode 100644
index 0000000..1dd6c3a
--- /dev/null
+++ b/cpp/src/arrow/io/memory.cc
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/io/memory.h"
+
+#include <sys/mman.h> // For memory-mapping
+
+#include <algorithm>
+#include <cerrno>
+#include <cstdint>
+#include <cstdio>
+#include <cstring>
+#include <sstream>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+
+#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+namespace io {
+
+// Implement MemoryMappedFile
+
+class MemoryMappedFile::MemoryMappedFileImpl {
+ public:
+ MemoryMappedFileImpl()
+ : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {}
+
+ ~MemoryMappedFileImpl() {
+ if (is_open_) {
+ munmap(data_, size_);
+ fclose(file_);
+ }
+ }
+
+ Status Open(const std::string& path, FileMode::type mode) {
+ if (is_open_) { return Status::IOError("A file is already open"); }
+
+ int prot_flags = PROT_READ;
+
+ if (mode == FileMode::READWRITE) {
+ file_ = fopen(path.c_str(), "r+b");
+ prot_flags |= PROT_WRITE;
+ is_writable_ = true;
+ } else {
+ file_ = fopen(path.c_str(), "rb");
+ }
+ if (file_ == nullptr) {
+ std::stringstream ss;
+ ss << "Unable to open file, errno: " << errno;
+ return Status::IOError(ss.str());
+ }
+
+ fseek(file_, 0L, SEEK_END);
+ if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); }
+ size_ = ftell(file_);
+
+ fseek(file_, 0L, SEEK_SET);
+ is_open_ = true;
+ position_ = 0;
+
+ void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0);
+ if (result == MAP_FAILED) {
+ std::stringstream ss;
+ ss << "Memory mapping file failed, errno: " << errno;
+ return Status::IOError(ss.str());
+ }
+ data_ = reinterpret_cast<uint8_t*>(result);
+
+ return Status::OK();
+ }
+
+ int64_t size() const { return size_; }
+
+ Status Seek(int64_t position) {
+ if (position < 0 || position >= size_) {
+ return Status::Invalid("position is out of bounds");
+ }
+ position_ = position;
+ return Status::OK();
+ }
+
+ int64_t position() { return position_; }
+
+ void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }
+
+ uint8_t* data() { return data_; }
+
+ uint8_t* head() { return data_ + position_; }
+
+ bool writable() { return is_writable_; }
+
+ bool opened() { return is_open_; }
+
+ private:
+ FILE* file_;
+ int64_t position_;
+ int64_t size_;
+ bool is_open_;
+ bool is_writable_;
+
+ // The memory map
+ uint8_t* data_;
+};
+
+MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
+ ReadableFileInterface::set_mode(mode);
+}
+
+Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
+ std::shared_ptr<MemoryMappedFile>* out) {
+ std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));
+
+ result->impl_.reset(new MemoryMappedFileImpl());
+ RETURN_NOT_OK(result->impl_->Open(path, mode));
+
+ *out = result;
+ return Status::OK();
+}
+
+Status MemoryMappedFile::GetSize(int64_t* size) {
+ *size = impl_->size();
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Tell(int64_t* position) {
+ *position = impl_->position();
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Seek(int64_t position) {
+ return impl_->Seek(position);
+}
+
+Status MemoryMappedFile::Close() {
+ // munmap handled in pimpl dtor
+ return Status::OK();
+}
+
+Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+ nbytes = std::min(nbytes, impl_->size() - impl_->position());
+ std::memcpy(out, impl_->head(), nbytes);
+ *bytes_read = nbytes;
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
+Status MemoryMappedFile::ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+ RETURN_NOT_OK(impl_->Seek(position));
+ return Read(nbytes, bytes_read, out);
+}
+
+Status MemoryMappedFile::ReadAt(
+ int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ nbytes = std::min(nbytes, impl_->size() - position);
+ RETURN_NOT_OK(impl_->Seek(position));
+ *out = std::make_shared<Buffer>(impl_->head(), nbytes);
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
+bool MemoryMappedFile::supports_zero_copy() const {
+ return true;
+}
+
+Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+ if (!impl_->opened() || !impl_->writable()) {
+ return Status::IOError("Unable to write");
+ }
+
+ RETURN_NOT_OK(impl_->Seek(position));
+ return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
+ if (!impl_->opened() || !impl_->writable()) {
+ return Status::IOError("Unable to write");
+ }
+ if (nbytes + impl_->position() > impl_->size()) {
+ return Status::Invalid("Cannot write past end of memory map");
+ }
+
+ return WriteInternal(data, nbytes);
+}
+
+Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
+ memcpy(impl_->head(), data, nbytes);
+ impl_->advance(nbytes);
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// In-memory buffer reader
+
+Status BufferReader::Close() {
+ // no-op
+ return Status::OK();
+}
+
+Status BufferReader::Tell(int64_t* position) {
+ *position = position_;
+ return Status::OK();
+}
+
+Status BufferReader::ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
+ RETURN_NOT_OK(Seek(position));
+ return Read(nbytes, bytes_read, buffer);
+}
+
+Status BufferReader::ReadAt(
+ int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
+ int64_t size = std::min(nbytes, buffer_size_ - position_);
+ *out = std::make_shared<Buffer>(buffer_ + position, size);
+ position_ += nbytes;
+ return Status::OK();
+}
+
+bool BufferReader::supports_zero_copy() const {
+ return true;
+}
+
+Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
+ memcpy(buffer, buffer_ + position_, nbytes);
+ *bytes_read = std::min(nbytes, buffer_size_ - position_);
+ position_ += *bytes_read;
+ return Status::OK();
+}
+
+Status BufferReader::GetSize(int64_t* size) {
+ *size = buffer_size_;
+ return Status::OK();
+}
+
+Status BufferReader::Seek(int64_t position) {
+ if (position < 0 || position >= buffer_size_) {
+ return Status::IOError("position out of bounds");
+ }
+
+ position_ = position;
+ return Status::OK();
+}
+
+} // namespace io
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
new file mode 100644
index 0000000..6fe47c3
--- /dev/null
+++ b/cpp/src/arrow/io/memory.h
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Public API for different memory sharing / IO mechanisms
+
+#ifndef ARROW_IO_MEMORY_H
+#define ARROW_IO_MEMORY_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "arrow/io/interfaces.h"
+
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Buffer;
+class MutableBuffer;
+class Status;
+
+namespace io {
+
+// An output stream that writes to a MutableBuffer, such as one obtained from a
+// memory map
+//
+// TODO(wesm): Implement this class
+class ARROW_EXPORT BufferOutputStream : public OutputStream {
+ public:
+ explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
+ : buffer_(buffer) {}
+
+ // Implement the OutputStream interface
+ Status Close() override;
+ Status Tell(int64_t* position) override;
+ Status Write(const uint8_t* data, int64_t length) override;
+
+ // Returns the number of bytes remaining in the buffer
+ int64_t bytes_remaining() const;
+
+ private:
+ std::shared_ptr<MutableBuffer> buffer_;
+ int64_t capacity_;
+ int64_t position_;
+};
+
+// A memory source that uses memory-mapped files for memory interactions
+class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
+ public:
+ static Status Open(const std::string& path, FileMode::type mode,
+ std::shared_ptr<MemoryMappedFile>* out);
+
+ Status Close() override;
+
+ Status Tell(int64_t* position) override;
+
+ Status Seek(int64_t position) override;
+
+ // Required by ReadableFileInterface, copies memory into out
+ Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
+
+ Status ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
+
+ // Read into a buffer, zero copy if possible
+ Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ bool supports_zero_copy() const override;
+
+ Status Write(const uint8_t* data, int64_t nbytes) override;
+
+ Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+ // @return: the size in bytes of the memory source
+ Status GetSize(int64_t* size) override;
+
+ private:
+ explicit MemoryMappedFile(FileMode::type mode);
+
+ Status WriteInternal(const uint8_t* data, int64_t nbytes);
+
+ // Hide the internal details of this class for now
+ class MemoryMappedFileImpl;
+ std::unique_ptr<MemoryMappedFileImpl> impl_;
+};
+
+class ARROW_EXPORT BufferReader : public ReadableFileInterface {
+ public:
+ BufferReader(const uint8_t* buffer, int buffer_size)
+ : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+
+ Status Close() override;
+ Status Tell(int64_t* position) override;
+
+ Status ReadAt(
+ int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+ Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
+ Status GetSize(int64_t* size) override;
+ Status Seek(int64_t position) override;
+
+ bool supports_zero_copy() const override;
+
+ private:
+ const uint8_t* buffer_;
+ int buffer_size_;
+ int64_t position_;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_MEMORY_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
new file mode 100644
index 0000000..1954d47
--- /dev/null
+++ b/cpp/src/arrow/io/test-common.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_IO_TEST_COMMON_H
+#define ARROW_IO_TEST_COMMON_H
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/io/memory.h"
+#include "arrow/test-util.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+
+namespace arrow {
+namespace io {
+
+class MemoryMapFixture {
+ public:
+ void TearDown() {
+ for (auto path : tmp_files_) {
+ std::remove(path.c_str());
+ }
+ }
+
+ void CreateFile(const std::string path, int64_t size) {
+ FILE* file = fopen(path.c_str(), "w");
+ if (file != nullptr) { tmp_files_.push_back(path); }
+ ftruncate(fileno(file), size);
+ fclose(file);
+ }
+
+ Status InitMemoryMap(
+ int64_t size, const std::string& path, std::shared_ptr<MemoryMappedFile>* mmap) {
+ CreateFile(path, size);
+ return MemoryMappedFile::Open(path, FileMode::READWRITE, mmap);
+ }
+
+ private:
+ std::vector<std::string> tmp_files_;
+};
+
+} // namespace io
+} // namespace arrow
+
+#endif // ARROW_IO_TEST_COMMON_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 8263416..e5553a6 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -19,16 +19,50 @@
# arrow_ipc
#######################################
-# Headers: top level
-install(FILES
- adapter.h
- metadata.h
- memory.h
- DESTINATION include/arrow/ipc)
+set(ARROW_IPC_LINK_LIBS
+ arrow_io
+ arrow_shared
+)
+
+set(ARROW_IPC_PRIVATE_LINK_LIBS
+ )
+
+set(ARROW_IPC_TEST_LINK_LIBS
+ arrow_ipc
+ ${ARROW_IPC_PRIVATE_LINK_LIBS})
+
+set(ARROW_IPC_SRCS
+ adapter.cc
+ metadata.cc
+ metadata-internal.cc
+)
+
+# TODO(wesm): SHARED and STATIC targets
+add_library(arrow_ipc SHARED
+ ${ARROW_IPC_SRCS}
+)
+target_link_libraries(arrow_ipc
+ LINK_PUBLIC ${ARROW_IPC_LINK_LIBS}
+ LINK_PRIVATE ${ARROW_IPC_PRIVATE_LINK_LIBS})
+
+if(NOT APPLE)
+ # Localize thirdparty symbols using a linker version script. This hides them
+ # from the client application. The OS X linker does not support the
+ # version-script option.
+ set(ARROW_IPC_LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_SOURCE_DIR}/symbols.map")
+endif()
+
+SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
+ LINKER_LANGUAGE CXX
+ LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")
ADD_ARROW_TEST(ipc-adapter-test)
-ADD_ARROW_TEST(ipc-memory-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
+ ${ARROW_IPC_TEST_LINK_LIBS})
+
ADD_ARROW_TEST(ipc-metadata-test)
+ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
+ ${ARROW_IPC_TEST_LINK_LIBS})
# make clean will delete the generated file
set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE)
@@ -49,3 +83,13 @@ add_custom_command(
add_custom_target(metadata_fbs DEPENDS ${FBS_OUTPUT_FILES})
add_dependencies(arrow_objlib metadata_fbs)
+
+# Headers: top level
+install(FILES
+ adapter.h
+ metadata.h
+ DESTINATION include/arrow/ipc)
+
+install(TARGETS arrow_ipc
+ LIBRARY DESTINATION lib
+ ARCHIVE DESTINATION lib)
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 40d372b..0e101c8 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -24,9 +24,11 @@
#include "arrow/array.h"
#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/memory.h"
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/util.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
#include "arrow/schema.h"
#include "arrow/table.h"
#include "arrow/type.h"
@@ -144,10 +146,15 @@ class RowBatchWriter {
return Status::OK();
}
- Status Write(MemorySource* dst, int64_t position, int64_t* data_header_offset) {
+ Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
// Write out all the buffers contiguously and compute the total size of the
// memory payload
int64_t offset = 0;
+
+ // Get the starting position
+ int64_t position;
+ RETURN_NOT_OK(dst->Tell(&position));
+
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
int64_t size = 0;
@@ -171,7 +178,7 @@ class RowBatchWriter {
buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
if (size > 0) {
- RETURN_NOT_OK(dst->Write(position + offset, buffer->data(), size));
+ RETURN_NOT_OK(dst->Write(buffer->data(), size));
offset += size;
}
}
@@ -180,7 +187,7 @@ class RowBatchWriter {
// memory, the data header can be converted to a flatbuffer and written out
//
// Note: The memory written here is prefixed by the size of the flatbuffer
- // itself as an int32_t. On reading from a MemorySource, you will have to
+ // itself as an int32_t. On reading from a input, you will have to
// determine the data header size then request a buffer such that you can
// construct the flatbuffer data accessor object (see arrow::ipc::Message)
std::shared_ptr<Buffer> data_header;
@@ -188,8 +195,7 @@ class RowBatchWriter {
batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header));
// Write the data header at the end
- RETURN_NOT_OK(
- dst->Write(position + offset, data_header->data(), data_header->size()));
+ RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
*data_header_offset = position + offset;
return Status::OK();
@@ -199,9 +205,9 @@ class RowBatchWriter {
Status GetTotalSize(int64_t* size) {
// emulates the behavior of Write without actually writing
int64_t data_header_offset;
- MockMemorySource source(0);
- RETURN_NOT_OK(Write(&source, 0, &data_header_offset));
- *size = source.GetExtentBytesWritten();
+ MockOutputStream dst;
+ RETURN_NOT_OK(Write(&dst, &data_header_offset));
+ *size = dst.GetExtentBytesWritten();
return Status::OK();
}
@@ -214,12 +220,12 @@ class RowBatchWriter {
int max_recursion_depth_;
};
-Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
- int64_t* header_offset, int max_recursion_depth) {
+Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* header_offset,
+ int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
RowBatchWriter serializer(batch, max_recursion_depth);
RETURN_NOT_OK(serializer.AssemblePayload());
- return serializer.Write(dst, position, header_offset);
+ return serializer.Write(dst, header_offset);
}
Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
@@ -234,11 +240,11 @@ Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
static constexpr int64_t INIT_METADATA_SIZE = 4096;
-class RowBatchReader::Impl {
+class RowBatchReader::RowBatchReaderImpl {
public:
- Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata,
- int max_recursion_depth)
- : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
+ RowBatchReaderImpl(io::ReadableFileInterface* file,
+ const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
+ : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
num_buffers_ = metadata->num_buffers();
num_flattened_fields_ = metadata->num_fields();
}
@@ -339,10 +345,11 @@ class RowBatchReader::Impl {
Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
BufferMetadata metadata = metadata_->buffer(buffer_index);
RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
- return source_->ReadAt(metadata.offset, metadata.length, out);
+ return file_->ReadAt(metadata.offset, metadata.length, out);
}
- MemorySource* source_;
+ private:
+ io::ReadableFileInterface* file_;
std::shared_ptr<RecordBatchMessage> metadata_;
int field_index_;
@@ -352,22 +359,22 @@ class RowBatchReader::Impl {
int num_flattened_fields_;
};
-Status RowBatchReader::Open(
- MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out) {
- return Open(source, position, kMaxIpcRecursionDepth, out);
+Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
+ std::shared_ptr<RowBatchReader>* out) {
+ return Open(file, position, kMaxIpcRecursionDepth, out);
}
-Status RowBatchReader::Open(MemorySource* source, int64_t position,
+Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
std::shared_ptr<Buffer> metadata;
- RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata));
+ RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata));
int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
- // We may not need to call source->ReadAt again
+ // We may not need to call ReadAt again
if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
// We don't have enough data, read the indicated metadata size.
- RETURN_NOT_OK(source->ReadAt(position + sizeof(int32_t), metadata_size, &metadata));
+ RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata));
}
// TODO(wesm): buffer slicing here would be better in case ReadAt returns
@@ -383,14 +390,14 @@ Status RowBatchReader::Open(MemorySource* source, int64_t position,
std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
std::shared_ptr<RowBatchReader> result(new RowBatchReader());
- result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth));
+ result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, max_recursion_depth));
*out = result;
return Status::OK();
}
// Here the explicit destructor is required for compilers to be aware of
-// the complete information of RowBatchReader::Impl class
+// the complete information of RowBatchReader::RowBatchReaderImpl class
RowBatchReader::~RowBatchReader() {}
Status RowBatchReader::GetRowBatch(
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 6231af6..215b46f 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -33,9 +33,15 @@ class RowBatch;
class Schema;
class Status;
+namespace io {
+
+class ReadableFileInterface;
+class OutputStream;
+
+} // namespace io
+
namespace ipc {
-class MemorySource;
class RecordBatchMessage;
// ----------------------------------------------------------------------
@@ -43,22 +49,21 @@ class RecordBatchMessage;
// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number
// TODO(emkornfield) investigate this more
constexpr int kMaxIpcRecursionDepth = 64;
-// Write the RowBatch (collection of equal-length Arrow arrays) to the memory
-// source at the indicated position
+
+// Write the RowBatch (collection of equal-length Arrow arrays) to the output
+// stream
//
-// First, each of the memory buffers are written out end-to-end in starting at
-// the indicated position.
+// First, each of the memory buffers are written out end-to-end
//
// Then, this function writes the batch metadata as a flatbuffer (see
// format/Message.fbs -- the RecordBatch message type) like so:
//
// <int32: metadata size> <uint8*: metadata>
//
-// Finally, the memory offset to the start of the metadata / data header is
-// returned in an out-variable
-ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch,
- int64_t position, int64_t* header_offset,
- int max_recursion_depth = kMaxIpcRecursionDepth);
+// Finally, the absolute offset (relative to the start of the output stream) to
+// the start of the metadata / data header is returned in an out-variable
+ARROW_EXPORT Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch,
+ int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
// int64_t GetRowBatchMetadata(const RowBatch* batch);
@@ -68,16 +73,16 @@ ARROW_EXPORT Status WriteRowBatch(MemorySource* dst, const RowBatch* batch,
ARROW_EXPORT Status GetRowBatchSize(const RowBatch* batch, int64_t* size);
// ----------------------------------------------------------------------
-// "Read" path; does not copy data if the MemorySource does not
+// "Read" path; does not copy data if the input supports zero copy reads
class ARROW_EXPORT RowBatchReader {
public:
- static Status Open(
- MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out);
-
- static Status Open(MemorySource* source, int64_t position, int max_recursion_depth,
+ static Status Open(io::ReadableFileInterface* file, int64_t position,
std::shared_ptr<RowBatchReader>* out);
+ static Status Open(io::ReadableFileInterface* file, int64_t position,
+ int max_recursion_depth, std::shared_ptr<RowBatchReader>* out);
+
virtual ~RowBatchReader();
// Reassemble the row batch. A Schema is required to be able to construct the
@@ -86,8 +91,8 @@ class ARROW_EXPORT RowBatchReader {
const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out);
private:
- class Impl;
- std::unique_ptr<Impl> impl_;
+ class RowBatchReaderImpl;
+ std::unique_ptr<RowBatchReaderImpl> impl_;
};
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 6740e0f..ca4d015 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -24,9 +24,11 @@
#include "gtest/gtest.h"
+#include "arrow/io/memory.h"
+#include "arrow/io/test-common.h"
#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/memory.h"
#include "arrow/ipc/test-common.h"
+#include "arrow/ipc/util.h"
#include "arrow/test-util.h"
#include "arrow/types/list.h"
@@ -49,17 +51,18 @@ const auto LIST_LIST_INT32 = std::make_shared<ListType>(LIST_INT32);
typedef Status MakeRowBatch(std::shared_ptr<RowBatch>* out);
class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
- public MemoryMapFixture {
+ public io::MemoryMapFixture {
public:
void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { MemoryMapFixture::TearDown(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
Status RoundTripHelper(const RowBatch& batch, int memory_map_size,
std::shared_ptr<RowBatch>* batch_result) {
std::string path = "test-write-row-batch";
- MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+ io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
int64_t header_location;
- RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+
+ RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, &header_location));
std::shared_ptr<RowBatchReader> reader;
RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
@@ -69,7 +72,7 @@ class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
}
protected:
- std::shared_ptr<MemoryMappedSource> mmap_;
+ std::shared_ptr<io::MemoryMappedFile> mmap_;
MemoryPool* pool_;
};
@@ -276,12 +279,12 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
&MakeStringTypesRowBatch, &MakeStruct));
void TestGetRowBatchSize(std::shared_ptr<RowBatch> batch) {
- MockMemorySource mock_source(1 << 16);
+ ipc::MockOutputStream mock;
int64_t mock_header_location = -1;
int64_t size = -1;
- ASSERT_OK(WriteRowBatch(&mock_source, batch.get(), 0, &mock_header_location));
+ ASSERT_OK(WriteRowBatch(&mock, batch.get(), &mock_header_location));
ASSERT_OK(GetRowBatchSize(batch.get(), &size));
- ASSERT_EQ(mock_source.GetExtentBytesWritten(), size);
+ ASSERT_EQ(mock.GetExtentBytesWritten(), size);
}
TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
@@ -303,10 +306,10 @@ TEST_F(TestWriteRowBatch, IntegerGetRowBatchSize) {
TestGetRowBatchSize(batch);
}
-class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
+class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
public:
void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { MemoryMapFixture::TearDown(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
Status WriteToMmap(int recursion_level, bool override_level,
int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
@@ -329,19 +332,19 @@ class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
std::string path = "test-write-past-max-recursion";
const int memory_map_size = 1 << 16;
- MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+ io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
int64_t header_location;
int64_t* header_out_param = header_out == nullptr ? &header_location : header_out;
if (override_level) {
return WriteRowBatch(
- mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1);
+ mmap_.get(), batch.get(), header_out_param, recursion_level + 1);
} else {
- return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param);
+ return WriteRowBatch(mmap_.get(), batch.get(), header_out_param);
}
}
protected:
- std::shared_ptr<MemoryMappedSource> mmap_;
+ std::shared_ptr<io::MemoryMappedFile> mmap_;
MemoryPool* pool_;
};
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/ipc-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-memory-test.cc b/cpp/src/arrow/ipc/ipc-memory-test.cc
deleted file mode 100644
index a2dbd35..0000000
--- a/cpp/src/arrow/ipc/ipc-memory-test.cc
+++ /dev/null
@@ -1,127 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "gtest/gtest.h"
-
-#include "arrow/ipc/memory.h"
-#include "arrow/ipc/test-common.h"
-
-namespace arrow {
-namespace ipc {
-
-class TestMemoryMappedSource : public ::testing::Test, public MemoryMapFixture {
- public:
- void TearDown() { MemoryMapFixture::TearDown(); }
-};
-
-TEST_F(TestMemoryMappedSource, InvalidUsages) {}
-
-TEST_F(TestMemoryMappedSource, WriteRead) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- const int reps = 5;
-
- std::string path = "ipc-write-read-test";
- CreateFile(path, reps * buffer_size);
-
- std::shared_ptr<MemoryMappedSource> result;
- ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &result));
-
- int64_t position = 0;
-
- std::shared_ptr<Buffer> out_buffer;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(result->Write(position, buffer.data(), buffer_size));
- ASSERT_OK(result->ReadAt(position, buffer_size, &out_buffer));
-
- ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
-
- position += buffer_size;
- }
-}
-
-TEST_F(TestMemoryMappedSource, ReadOnly) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- const int reps = 5;
-
- std::string path = "ipc-read-only-test";
- CreateFile(path, reps * buffer_size);
-
- std::shared_ptr<MemoryMappedSource> rwmmap;
- ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &rwmmap));
-
- int64_t position = 0;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(rwmmap->Write(position, buffer.data(), buffer_size));
-
- position += buffer_size;
- }
- rwmmap->Close();
-
- std::shared_ptr<MemoryMappedSource> rommap;
- ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));
-
- position = 0;
- std::shared_ptr<Buffer> out_buffer;
- for (int i = 0; i < reps; ++i) {
- ASSERT_OK(rommap->ReadAt(position, buffer_size, &out_buffer));
-
- ASSERT_EQ(0, memcmp(out_buffer->data(), buffer.data(), buffer_size));
- position += buffer_size;
- }
- rommap->Close();
-}
-
-TEST_F(TestMemoryMappedSource, InvalidMode) {
- const int64_t buffer_size = 1024;
- std::vector<uint8_t> buffer(buffer_size);
-
- test::random_bytes(1024, 0, buffer.data());
-
- std::string path = "ipc-invalid-mode-test";
- CreateFile(path, buffer_size);
-
- std::shared_ptr<MemoryMappedSource> rommap;
- ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_ONLY, &rommap));
-
- ASSERT_RAISES(IOError, rommap->Write(0, buffer.data(), buffer_size));
-}
-
-TEST_F(TestMemoryMappedSource, InvalidFile) {
- std::string non_existent_path = "invalid-file-name-asfd";
-
- std::shared_ptr<MemoryMappedSource> result;
- ASSERT_RAISES(IOError,
- MemoryMappedSource::Open(non_existent_path, MemorySource::READ_ONLY, &result));
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
deleted file mode 100644
index a6c56d6..0000000
--- a/cpp/src/arrow/ipc/memory.cc
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/memory.h"
-
-#include <sys/mman.h> // For memory-mapping
-
-#include <algorithm>
-#include <cerrno>
-#include <cstdint>
-#include <cstdio>
-#include <cstring>
-#include <sstream>
-#include <string>
-
-#include "arrow/util/buffer.h"
-#include "arrow/util/status.h"
-
-namespace arrow {
-namespace ipc {
-
-MemorySource::MemorySource(AccessMode access_mode) : access_mode_(access_mode) {}
-
-MemorySource::~MemorySource() {}
-
-// Implement MemoryMappedSource
-
-class MemoryMappedSource::Impl {
- public:
- Impl() : file_(nullptr), is_open_(false), is_writable_(false), data_(nullptr) {}
-
- ~Impl() {
- if (is_open_) {
- munmap(data_, size_);
- fclose(file_);
- }
- }
-
- Status Open(const std::string& path, MemorySource::AccessMode mode) {
- if (is_open_) { return Status::IOError("A file is already open"); }
-
- int prot_flags = PROT_READ;
-
- if (mode == MemorySource::READ_WRITE) {
- file_ = fopen(path.c_str(), "r+b");
- prot_flags |= PROT_WRITE;
- is_writable_ = true;
- } else {
- file_ = fopen(path.c_str(), "rb");
- }
- if (file_ == nullptr) {
- std::stringstream ss;
- ss << "Unable to open file, errno: " << errno;
- return Status::IOError(ss.str());
- }
-
- fseek(file_, 0L, SEEK_END);
- if (ferror(file_)) { return Status::IOError("Unable to seek to end of file"); }
- size_ = ftell(file_);
-
- fseek(file_, 0L, SEEK_SET);
- is_open_ = true;
-
- void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fileno(file_), 0);
- if (result == MAP_FAILED) {
- std::stringstream ss;
- ss << "Memory mapping file failed, errno: " << errno;
- return Status::IOError(ss.str());
- }
- data_ = reinterpret_cast<uint8_t*>(result);
-
- return Status::OK();
- }
-
- int64_t size() const { return size_; }
-
- uint8_t* data() { return data_; }
-
- bool writable() { return is_writable_; }
-
- bool opened() { return is_open_; }
-
- private:
- FILE* file_;
- int64_t size_;
- bool is_open_;
- bool is_writable_;
-
- // The memory map
- uint8_t* data_;
-};
-
-MemoryMappedSource::MemoryMappedSource(AccessMode access_mode)
- : MemorySource(access_mode) {}
-
-Status MemoryMappedSource::Open(const std::string& path, AccessMode access_mode,
- std::shared_ptr<MemoryMappedSource>* out) {
- std::shared_ptr<MemoryMappedSource> result(new MemoryMappedSource(access_mode));
-
- result->impl_.reset(new Impl());
- RETURN_NOT_OK(result->impl_->Open(path, access_mode));
-
- *out = result;
- return Status::OK();
-}
-
-int64_t MemoryMappedSource::Size() const {
- return impl_->size();
-}
-
-Status MemoryMappedSource::Close() {
- // munmap handled in ::Impl dtor
- return Status::OK();
-}
-
-Status MemoryMappedSource::ReadAt(
- int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
- if (position < 0 || position >= impl_->size()) {
- return Status::Invalid("position is out of bounds");
- }
-
- nbytes = std::min(nbytes, impl_->size() - position);
- *out = std::make_shared<Buffer>(impl_->data() + position, nbytes);
- return Status::OK();
-}
-
-Status MemoryMappedSource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
- if (!impl_->opened() || !impl_->writable()) {
- return Status::IOError("Unable to write");
- }
- if (position < 0 || position >= impl_->size()) {
- return Status::Invalid("position is out of bounds");
- }
-
- // TODO(wesm): verify we are not writing past the end of the buffer
- uint8_t* dst = impl_->data() + position;
- memcpy(dst, data, nbytes);
-
- return Status::OK();
-}
-
-MockMemorySource::MockMemorySource(int64_t size)
- : size_(size), extent_bytes_written_(0) {}
-
-Status MockMemorySource::Close() {
- return Status::OK();
-}
-
-Status MockMemorySource::ReadAt(
- int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
- return Status::OK();
-}
-
-Status MockMemorySource::Write(int64_t position, const uint8_t* data, int64_t nbytes) {
- extent_bytes_written_ = std::max(extent_bytes_written_, position + nbytes);
- return Status::OK();
-}
-
-int64_t MockMemorySource::Size() const {
- return size_;
-}
-
-int64_t MockMemorySource::GetExtentBytesWritten() const {
- return extent_bytes_written_;
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.h b/cpp/src/arrow/ipc/memory.h
deleted file mode 100644
index 377401d..0000000
--- a/cpp/src/arrow/ipc/memory.h
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Public API for different interprocess memory sharing mechanisms
-
-#ifndef ARROW_IPC_MEMORY_H
-#define ARROW_IPC_MEMORY_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-#include "arrow/util/macros.h"
-#include "arrow/util/visibility.h"
-
-namespace arrow {
-
-class Buffer;
-class MutableBuffer;
-class Status;
-
-namespace ipc {
-
-// Abstract output stream
-class OutputStream {
- public:
- virtual ~OutputStream() {}
- // Close the output stream
- virtual Status Close() = 0;
-
- // The current position in the output stream
- virtual int64_t Tell() const = 0;
-
- // Write bytes to the stream
- virtual Status Write(const uint8_t* data, int64_t length) = 0;
-};
-
-// An output stream that writes to a MutableBuffer, such as one obtained from a
-// memory map
-class BufferOutputStream : public OutputStream {
- public:
- explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
- : buffer_(buffer) {}
-
- // Implement the OutputStream interface
- Status Close() override;
- int64_t Tell() const override;
- Status Write(const uint8_t* data, int64_t length) override;
-
- // Returns the number of bytes remaining in the buffer
- int64_t bytes_remaining() const;
-
- private:
- std::shared_ptr<MutableBuffer> buffer_;
- int64_t capacity_;
- int64_t position_;
-};
-
-class ARROW_EXPORT MemorySource {
- public:
- // Indicates the access permissions of the memory source
- enum AccessMode { READ_ONLY, READ_WRITE };
-
- virtual ~MemorySource();
-
- // Retrieve a buffer of memory from the source of the indicates size and at
- // the indicated location
- // @returns: arrow::Status indicating success / failure. The buffer is set
- // into the *out argument
- virtual Status ReadAt(
- int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) = 0;
-
- virtual Status Close() = 0;
-
- virtual Status Write(int64_t position, const uint8_t* data, int64_t nbytes) = 0;
-
- // @return: the size in bytes of the memory source
- virtual int64_t Size() const = 0;
-
- protected:
- explicit MemorySource(AccessMode access_mode = AccessMode::READ_WRITE);
-
- AccessMode access_mode_;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(MemorySource);
-};
-
-// A memory source that uses memory-mapped files for memory interactions
-class ARROW_EXPORT MemoryMappedSource : public MemorySource {
- public:
- static Status Open(const std::string& path, AccessMode access_mode,
- std::shared_ptr<MemoryMappedSource>* out);
-
- Status Close() override;
-
- Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
-
- Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
-
- // @return: the size in bytes of the memory source
- int64_t Size() const override;
-
- private:
- explicit MemoryMappedSource(AccessMode access_mode);
- // Hide the internal details of this class for now
- class Impl;
- std::unique_ptr<Impl> impl_;
-};
-
-// A MemorySource that tracks the size of allocations from a memory source
-class MockMemorySource : public MemorySource {
- public:
- explicit MockMemorySource(int64_t size);
-
- Status Close() override;
-
- Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
-
- Status Write(int64_t position, const uint8_t* data, int64_t nbytes) override;
-
- int64_t Size() const override;
-
- // @return: the smallest number of bytes containing the modified region of the
- // MockMemorySource
- int64_t GetExtentBytesWritten() const;
-
- private:
- int64_t size_;
- int64_t extent_bytes_written_;
-};
-
-} // namespace ipc
-} // namespace arrow
-
-#endif // ARROW_IPC_MEMORY_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 8cc902c..05e9c7a 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -220,9 +220,8 @@ static Status FieldToFlatbuffer(
auto fb_children = fbb.CreateVector(children);
// TODO: produce the list of VectorTypes
- *offset = flatbuf::CreateField(
- fbb, fb_name, field->nullable, type_enum, type_data, field->dictionary,
- fb_children);
+ *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
+ field->dictionary, fb_children);
return Status::OK();
}
@@ -295,8 +294,8 @@ Status WriteDataHeader(int32_t length, int64_t body_length,
}
Status MessageBuilder::Finish() {
- auto message = flatbuf::CreateMessage(fbb_, kMetadataVersion,
- header_type_, header_, body_length_);
+ auto message =
+ flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_);
fbb_.Finish(message);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index db9a83f..d38df84 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -38,7 +38,7 @@ class Status;
namespace ipc {
static constexpr flatbuf::MetadataVersion kMetadataVersion =
- flatbuf::MetadataVersion_V1_SNAPSHOT;
+ flatbuf::MetadataVersion_V1_SNAPSHOT;
Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
http://git-wip-us.apache.org/repos/asf/arrow/blob/559b8652/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 838a4a6..d5ec533 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -23,6 +23,8 @@
#include <cstdint>
#include <memory>
+#include "arrow/util/visibility.h"
+
namespace arrow {
class Buffer;
@@ -36,6 +38,7 @@ namespace ipc {
// Message read/write APIs
// Serialize arrow::Schema as a Flatbuffer
+ARROW_EXPORT
Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
//----------------------------------------------------------------------
@@ -47,7 +50,7 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
class Message;
// Container for serialized Schema metadata contained in an IPC message
-class SchemaMessage {
+class ARROW_EXPORT SchemaMessage {
public:
// Accepts an opaque flatbuffer pointer
SchemaMessage(const std::shared_ptr<Message>& message, const void* schema);
@@ -82,7 +85,7 @@ struct BufferMetadata {
};
// Container for serialized record batch metadata contained in an IPC message
-class RecordBatchMessage {
+class ARROW_EXPORT RecordBatchMessage {
public:
// Accepts an opaque flatbuffer pointer
RecordBatchMessage(const std::shared_ptr<Message>& message, const void* batch_meta);
@@ -102,13 +105,13 @@ class RecordBatchMessage {
std::unique_ptr<Impl> impl_;
};
-class DictionaryBatchMessage {
+class ARROW_EXPORT DictionaryBatchMessage {
public:
int64_t id() const;
std::unique_ptr<RecordBatchMessage> data() const;
};
-class Message : public std::enable_shared_from_this<Message> {
+class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> {
public:
enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };