You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/04/03 22:05:09 UTC
arrow git commit: ARROW-657: [C++/Python] Expose Tensor IPC in
Python. Add equals method. Add pyarrow.create_memory_map/memory_map functions
Repository: arrow
Updated Branches:
refs/heads/master 7232e5b5d -> 7d1d4e751
ARROW-657: [C++/Python] Expose Tensor IPC in Python. Add equals method. Add pyarrow.create_memory_map/memory_map functions
This adds a `MemoryMappedFile::Create` C++ function for allocating new memory maps of a particular size, with a Python wrapper. I combined the Cython header declarations for the main libarrow libraries into a single pxd file.
I am also checking in some tests from ARROW-718 that got left off the patch.
Author: Wes McKinney <we...@twosigma.com>
Closes #483 from wesm/ARROW-657 and squashes the following commits:
a5d2f00 [Wes McKinney] More readable cinit post refactor
fbf438d [Wes McKinney] clang-format
4024c22 [Wes McKinney] Fix MSVC issues, use slower SetBitTo implementation that doesn't have compiler warning
7847d0f [Wes McKinney] Make file names unique
994b83d [Wes McKinney] Try to fix MSVC
25cfc83 [Wes McKinney] Expose Tensor IPC in Python. Add equals method. Add create_memory_map function and memory_map factory
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/7d1d4e75
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/7d1d4e75
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/7d1d4e75
Branch: refs/heads/master
Commit: 7d1d4e751807ac38cfe7a5c537450ede3ae9eb00
Parents: 7232e5b
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Apr 3 18:05:04 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Apr 3 18:05:04 2017 -0400
----------------------------------------------------------------------
cpp/CMakeLists.txt | 2 +-
cpp/src/arrow/buffer.h | 7 +-
cpp/src/arrow/io/CMakeLists.txt | 2 +-
cpp/src/arrow/io/file.cc | 13 ++
cpp/src/arrow/io/file.h | 4 +
cpp/src/arrow/io/io-file-test.cc | 8 +-
cpp/src/arrow/io/test-common.h | 20 +--
cpp/src/arrow/ipc/CMakeLists.txt | 2 +-
cpp/src/arrow/ipc/feather-internal.h | 16 +-
cpp/src/arrow/ipc/feather.h | 1 +
cpp/src/arrow/ipc/ipc-read-write-test.cc | 12 +-
cpp/src/arrow/ipc/metadata.h | 27 +--
cpp/src/arrow/ipc/writer.h | 18 +-
cpp/src/arrow/util/bit-util.h | 10 +-
python/pyarrow/__init__.py | 6 +-
python/pyarrow/_parquet.pxd | 4 +-
python/pyarrow/_parquet.pyx | 2 -
python/pyarrow/array.pxd | 1 +
python/pyarrow/array.pyx | 6 +
python/pyarrow/includes/libarrow.pxd | 235 ++++++++++++++++++++++++++
python/pyarrow/includes/libarrow_io.pxd | 171 -------------------
python/pyarrow/includes/libarrow_ipc.pxd | 94 -----------
python/pyarrow/includes/pyarrow.pxd | 12 +-
python/pyarrow/io.pxd | 4 +-
python/pyarrow/io.pyx | 124 +++++++++++++-
python/pyarrow/tests/test_io.py | 16 +-
python/pyarrow/tests/test_tensor.py | 93 ++++++++++
27 files changed, 553 insertions(+), 357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index aacc7a1..d26c847 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -806,7 +806,7 @@ set(ARROW_SRCS
src/arrow/util/bit-util.cc
)
-if(NOT APPLE)
+if(NOT APPLE AND NOT MSVC)
# Localize thirdparty symbols using a linker version script. This hides them
# from the client application. The OS X linker does not support the
# version-script option.
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h
index 3f14c96..a02ce3c 100644
--- a/cpp/src/arrow/buffer.h
+++ b/cpp/src/arrow/buffer.h
@@ -46,7 +46,7 @@ class Status;
class ARROW_EXPORT Buffer {
public:
Buffer(const uint8_t* data, int64_t size)
- : is_mutable_(false), data_(data), size_(size), capacity_(size) {}
+ : is_mutable_(false), data_(data), size_(size), capacity_(size) {}
virtual ~Buffer();
/// An offset into data that is owned by another buffer, but we want to be
@@ -57,7 +57,7 @@ class ARROW_EXPORT Buffer {
/// in general we expected buffers to be aligned and padded to 64 bytes. In the future
/// we might add utility methods to help determine if a buffer satisfies this contract.
Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size)
- : Buffer(parent->data() + offset, size) {
+ : Buffer(parent->data() + offset, size) {
parent_ = parent;
}
@@ -112,8 +112,7 @@ std::shared_ptr<Buffer> ARROW_EXPORT SliceMutableBuffer(
/// A Buffer whose contents can be mutated. May or may not own its data.
class ARROW_EXPORT MutableBuffer : public Buffer {
public:
- MutableBuffer(uint8_t* data, int64_t size)
- : Buffer(data, size) {
+ MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) {
mutable_data_ = data;
is_mutable_ = true;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index 3951eac..791c29c 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -75,7 +75,7 @@ set(ARROW_IO_SRCS
memory.cc
)
-if(NOT APPLE)
+if(NOT APPLE AND NOT MSVC)
# Localize thirdparty symbols using a linker version script. This hides them
# from the client application. The OS X linker does not support the
# version-script option.
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 0aa2c92..720be3d 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -604,6 +604,19 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
MemoryMappedFile::MemoryMappedFile() {}
MemoryMappedFile::~MemoryMappedFile() {}
+Status MemoryMappedFile::Create(
+ const std::string& path, int64_t size, std::shared_ptr<MemoryMappedFile>* out) {
+ std::shared_ptr<FileOutputStream> file;
+ RETURN_NOT_OK(FileOutputStream::Open(path, &file));
+#ifdef _MSC_VER
+ _chsize_s(file->file_descriptor(), static_cast<size_t>(size));
+#else
+ ftruncate(file->file_descriptor(), static_cast<size_t>(size));
+#endif
+ RETURN_NOT_OK(file->Close());
+ return MemoryMappedFile::Open(path, FileMode::READWRITE, out);
+}
+
Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out) {
std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile());
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index f687fad..f0be3cf 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -106,6 +106,10 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
public:
~MemoryMappedFile();
+ /// Create new file with indicated size, return in read/write mode
+ static Status Create(
+ const std::string& path, int64_t size, std::shared_ptr<MemoryMappedFile>* out);
+
static Status Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out);
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 348be17..a5784de 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -393,10 +393,8 @@ TEST_F(TestMemoryMappedFile, WriteRead) {
const int reps = 5;
std::string path = "ipc-write-read-test";
- CreateFile(path, reps * buffer_size);
-
std::shared_ptr<MemoryMappedFile> result;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &result));
+ ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &result));
int64_t position = 0;
std::shared_ptr<Buffer> out_buffer;
@@ -419,10 +417,8 @@ TEST_F(TestMemoryMappedFile, ReadOnly) {
const int reps = 5;
std::string path = "ipc-read-only-test";
- CreateFile(path, reps * buffer_size);
-
std::shared_ptr<MemoryMappedFile> rwmmap;
- ASSERT_OK(MemoryMappedFile::Open(path, FileMode::READWRITE, &rwmmap));
+ ASSERT_OK(InitMemoryMap(reps * buffer_size, path, &rwmmap));
int64_t position = 0;
for (int i = 0; i < reps; ++i) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/io/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/test-common.h b/cpp/src/arrow/io/test-common.h
index db5bcc1..d6ec270 100644
--- a/cpp/src/arrow/io/test-common.h
+++ b/cpp/src/arrow/io/test-common.h
@@ -67,23 +67,17 @@ class MemoryMapFixture {
}
}
- void CreateFile(const std::string path, int64_t size) {
- FILE* file = fopen(path.c_str(), "w");
- if (file != nullptr) {
- tmp_files_.push_back(path);
-#ifdef _MSC_VER
- _chsize(fileno(file), static_cast<size_t>(size));
-#else
- ftruncate(fileno(file), static_cast<size_t>(size));
-#endif
- fclose(file);
- }
+ void CreateFile(const std::string& path, int64_t size) {
+ std::shared_ptr<MemoryMappedFile> file;
+ ASSERT_OK(MemoryMappedFile::Create(path, size, &file));
+ tmp_files_.push_back(path);
}
Status InitMemoryMap(
int64_t size, const std::string& path, std::shared_ptr<MemoryMappedFile>* mmap) {
- CreateFile(path, size);
- return MemoryMappedFile::Open(path, FileMode::READWRITE, mmap);
+ RETURN_NOT_OK(MemoryMappedFile::Create(path, size, mmap));
+ tmp_files_.push_back(path);
+ return Status::OK();
}
private:
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 5fa7d61..57db033 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -38,7 +38,7 @@ set(ARROW_IPC_SRCS
writer.cc
)
-if(NOT APPLE)
+if(NOT MSVC AND NOT APPLE)
# Localize thirdparty symbols using a linker version script. This hides them
# from the client application. The OS X linker does not support the
# version-script option.
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/feather-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather-internal.h b/cpp/src/arrow/ipc/feather-internal.h
index 10b0cfd..6847445 100644
--- a/cpp/src/arrow/ipc/feather-internal.h
+++ b/cpp/src/arrow/ipc/feather-internal.h
@@ -41,11 +41,11 @@ typedef std::vector<flatbuffers::Offset<fbs::Column>> ColumnVector;
typedef flatbuffers::FlatBufferBuilder FBB;
typedef flatbuffers::Offset<flatbuffers::String> FBString;
-struct ColumnType {
+struct ARROW_EXPORT ColumnType {
enum type { PRIMITIVE, CATEGORY, TIMESTAMP, DATE, TIME };
};
-struct ArrayMetadata {
+struct ARROW_EXPORT ArrayMetadata {
ArrayMetadata() {}
ArrayMetadata(fbs::Type type, int64_t offset, int64_t length, int64_t null_count,
@@ -69,12 +69,12 @@ struct ArrayMetadata {
int64_t total_bytes;
};
-struct CategoryMetadata {
+struct ARROW_EXPORT CategoryMetadata {
ArrayMetadata levels;
bool ordered;
};
-struct TimestampMetadata {
+struct ARROW_EXPORT TimestampMetadata {
TimeUnit unit;
// A timezone name known to the Olson timezone database. For display purposes
@@ -82,7 +82,7 @@ struct TimestampMetadata {
std::string timezone;
};
-struct TimeMetadata {
+struct ARROW_EXPORT TimeMetadata {
TimeUnit unit;
};
@@ -91,7 +91,7 @@ static constexpr const int kFeatherDefaultAlignment = 8;
class ColumnBuilder;
-class TableBuilder {
+class ARROW_EXPORT TableBuilder {
public:
explicit TableBuilder(int64_t num_rows);
~TableBuilder() = default;
@@ -116,7 +116,7 @@ class TableBuilder {
int64_t num_rows_;
};
-class TableMetadata {
+class ARROW_EXPORT TableMetadata {
public:
TableMetadata() {}
~TableMetadata() = default;
@@ -186,7 +186,7 @@ static inline void FromFlatbuffer(const fbs::PrimitiveArray* values, ArrayMetada
out->total_bytes = values->total_bytes();
}
-class ColumnBuilder {
+class ARROW_EXPORT ColumnBuilder {
public:
ColumnBuilder(TableBuilder* parent, const std::string& name);
~ColumnBuilder() = default;
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/feather.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h
index 8cc8ca0..4d59a8b 100644
--- a/cpp/src/arrow/ipc/feather.h
+++ b/cpp/src/arrow/ipc/feather.h
@@ -27,6 +27,7 @@
#include <vector>
#include "arrow/type.h"
+#include "arrow/util/visibility.h"
namespace arrow {
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/ipc-read-write-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-read-write-test.cc b/cpp/src/arrow/ipc/ipc-read-write-test.cc
index c900d0b..86ec770 100644
--- a/cpp/src/arrow/ipc/ipc-read-write-test.cc
+++ b/cpp/src/arrow/ipc/ipc-read-write-test.cc
@@ -106,6 +106,8 @@ TEST_F(TestSchemaMetadata, NestedFields) {
&MakeStruct, &MakeUnion, &MakeDictionary, &MakeDates, &MakeTimestamps, &MakeTimes, \
&MakeFWBinary, &MakeBooleanBatch);
+static int g_file_number = 0;
+
class IpcTestFixture : public io::MemoryMapFixture {
public:
Status DoStandardRoundTrip(const RecordBatch& batch, bool zero_data,
@@ -163,8 +165,9 @@ class IpcTestFixture : public io::MemoryMapFixture {
}
void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
- std::string path = "test-write-row-batch";
- ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, path, &mmap_));
+ std::stringstream ss;
+ ss << "test-write-row-batch-" << g_file_number++;
+ ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, ss.str(), &mmap_));
std::shared_ptr<RecordBatch> result;
ASSERT_OK(DoStandardRoundTrip(batch, true, &result));
@@ -303,9 +306,10 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
std::vector<std::shared_ptr<Array>> arrays = {array};
*batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
- std::string path = "test-write-past-max-recursion";
+ std::stringstream ss;
+ ss << "test-write-past-max-recursion-" << g_file_number++;
const int memory_map_size = 1 << 20;
- io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+ RETURN_NOT_OK(io::MemoryMapFixture::InitMemoryMap(memory_map_size, ss.str(), &mmap_));
if (override_level) {
return WriteRecordBatch(**batch, 0, mmap_.get(), metadata_length, body_length,
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index fac4a70..451a76d 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -72,7 +72,7 @@ using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;
using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>;
// Memoization data structure for handling shared dictionaries
-class DictionaryMemo {
+class ARROW_EXPORT DictionaryMemo {
public:
DictionaryMemo();
@@ -114,12 +114,12 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi
// Construct a complete Schema from the message. May be expensive for very
// large schemas if you are only interested in a few fields
-Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo,
- std::shared_ptr<Schema>* out);
+Status ARROW_EXPORT GetSchema(const void* opaque_schema,
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out);
-Status GetTensorMetadata(const void* opaque_tensor, std::shared_ptr<DataType>* type,
- std::vector<int64_t>* shape, std::vector<int64_t>* strides,
- std::vector<std::string>* dim_names);
+Status ARROW_EXPORT GetTensorMetadata(const void* opaque_tensor,
+ std::shared_ptr<DataType>* type, std::vector<int64_t>* shape,
+ std::vector<int64_t>* strides, std::vector<std::string>* dim_names);
class ARROW_EXPORT Message {
public:
@@ -157,18 +157,19 @@ class ARROW_EXPORT Message {
/// \param[in] file the seekable file interface to read from
/// \param[out] message the message read
/// \return Status success or failure
-Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
- std::shared_ptr<Message>* message);
+Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length,
+ io::RandomAccessFile* file, std::shared_ptr<Message>* message);
/// Read length-prefixed message with as-yet unknown length. Returns nullptr if
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
-Status ReadMessage(io::InputStream* stream, std::shared_ptr<Message>* message);
+Status ARROW_EXPORT ReadMessage(
+ io::InputStream* stream, std::shared_ptr<Message>* message);
/// Write a serialized message with a length-prefix and padding to an 8-byte offset
///
/// <message_size: int32><message: const void*><padding>
-Status WriteMessage(
+Status ARROW_EXPORT WriteMessage(
const Buffer& message, io::OutputStream* file, int32_t* message_length);
// Serialize arrow::Schema as a Flatbuffer
@@ -178,14 +179,14 @@ Status WriteMessage(
// dictionary ids
// \param[out] out the serialized arrow::Buffer
// \return Status outcome
-Status WriteSchemaMessage(
+Status ARROW_EXPORT WriteSchemaMessage(
const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
-Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
+Status ARROW_EXPORT WriteRecordBatchMessage(int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
-Status WriteTensorMessage(
+Status ARROW_EXPORT WriteTensorMessage(
const Tensor& tensor, int64_t buffer_start_offset, std::shared_ptr<Buffer>* out);
Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 8b2dc9c..0b7a6e1 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -66,9 +66,9 @@ namespace ipc {
/// including padding to a 64-byte boundary
/// @param(out) body_length: the size of the contiguous buffer block plus
/// padding bytes
-Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth,
+Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
+ int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth,
bool allow_64bit = false);
// Write Array as a DictionaryBatch message
@@ -79,7 +79,7 @@ Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dict
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
-Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
+Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
class ARROW_EXPORT StreamWriter {
public:
@@ -122,14 +122,14 @@ class ARROW_EXPORT FileWriter : public StreamWriter {
/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data
/// may not be readable by all Arrow implementations
-Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool);
+Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
+ int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool);
/// EXPERIMENTAL: Write arrow::Tensor as a contiguous message
/// <metadata size><metadata><tensor data>
-Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length);
+Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length);
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 6e3e8ae..42afd07 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -52,7 +52,7 @@ static inline int64_t Ceil2Bytes(int64_t size) {
}
static inline bool GetBit(const uint8_t* bits, int64_t i) {
- return static_cast<bool>(bits[i / 8] & kBitmask[i % 8]);
+ return (bits[i / 8] & kBitmask[i % 8]) != 0;
}
static inline bool BitNotSet(const uint8_t* bits, int64_t i) {
@@ -68,9 +68,13 @@ static inline void SetBit(uint8_t* bits, int64_t i) {
}
static inline void SetBitTo(uint8_t* bits, int64_t i, bool bit_is_set) {
- // See https://graphics.stanford.edu/~seander/bithacks.html
+ // TODO: speed up. See https://graphics.stanford.edu/~seander/bithacks.html
// "Conditionally set or clear bits without branching"
- bits[i / 8] ^= static_cast<uint8_t>(-bit_is_set ^ bits[i / 8]) & kBitmask[i % 8];
+ if (bit_is_set) {
+ SetBit(bits, i);
+ } else {
+ ClearBit(bits, i);
+ }
}
static inline int64_t NextPower2(int64_t n) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 5215028..6860f98 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -42,8 +42,10 @@ from pyarrow.error import ArrowException
from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
from pyarrow.io import (HdfsFile, NativeFile, PythonFileInterface,
- Buffer, InMemoryOutputStream, BufferReader,
- frombuffer)
+ Buffer, BufferReader, InMemoryOutputStream,
+ MemoryMappedFile, memory_map,
+ frombuffer, read_tensor, write_tensor,
+ memory_map, create_memory_map)
from pyarrow.ipc import FileReader, FileWriter, StreamReader, StreamWriter
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/_parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index cf9ec8e..f12c86f 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -19,8 +19,8 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
- CTable, CMemoryPool)
-from pyarrow.includes.libarrow_io cimport RandomAccessFile, OutputStream
+ CTable, CMemoryPool,
+ RandomAccessFile, OutputStream)
cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/_parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index c4cbd28..cfd2816 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -23,8 +23,6 @@ from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_io cimport (RandomAccessFile, OutputStream,
- FileOutputStream)
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.array cimport Array
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd
index 4267563..f6aaea2 100644
--- a/python/pyarrow/array.pxd
+++ b/python/pyarrow/array.pxd
@@ -53,6 +53,7 @@ cdef class Tensor:
cdef object box_array(const shared_ptr[CArray]& sp_array)
+cdef object box_tensor(const shared_ptr[CTensor]& sp_tensor)
cdef class BooleanArray(Array):
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index 398e4cb..e7c456d 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -347,6 +347,12 @@ strides: {2}""".format(self.type, self.shape, self.strides)
&out))
return PyObject_to_object(out)
+ def equals(self, Tensor other):
+ """
+ Return true if the tensors contains exactly equal data
+ """
+ return self.tp.Equals(deref(other.tp))
+
property is_mutable:
def __get__(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8da063c..67d6af9 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -303,6 +303,162 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CTable]* result)
+cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
+ enum FileMode" arrow::io::FileMode::type":
+ FileMode_READ" arrow::io::FileMode::READ"
+ FileMode_WRITE" arrow::io::FileMode::WRITE"
+ FileMode_READWRITE" arrow::io::FileMode::READWRITE"
+
+ enum ObjectType" arrow::io::ObjectType::type":
+ ObjectType_FILE" arrow::io::ObjectType::FILE"
+ ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY"
+
+ cdef cppclass FileInterface:
+ CStatus Close()
+ CStatus Tell(int64_t* position)
+ FileMode mode()
+
+ cdef cppclass Readable:
+ CStatus ReadB" Read"(int64_t nbytes, shared_ptr[CBuffer]* out)
+ CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
+
+ cdef cppclass Seekable:
+ CStatus Seek(int64_t position)
+
+ cdef cppclass Writeable:
+ CStatus Write(const uint8_t* data, int64_t nbytes)
+
+ cdef cppclass OutputStream(FileInterface, Writeable):
+ pass
+
+ cdef cppclass InputStream(FileInterface, Readable):
+ pass
+
+ cdef cppclass RandomAccessFile(InputStream, Seekable):
+ CStatus GetSize(int64_t* size)
+
+ CStatus ReadAt(int64_t position, int64_t nbytes,
+ int64_t* bytes_read, uint8_t* buffer)
+ CStatus ReadAt(int64_t position, int64_t nbytes,
+ int64_t* bytes_read, shared_ptr[CBuffer]* out)
+
+ cdef cppclass WriteableFileInterface(OutputStream, Seekable):
+ CStatus WriteAt(int64_t position, const uint8_t* data,
+ int64_t nbytes)
+
+ cdef cppclass ReadWriteFileInterface(RandomAccessFile,
+ WriteableFileInterface):
+ pass
+
+
+cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
+
+
+ cdef cppclass FileOutputStream(OutputStream):
+ @staticmethod
+ CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
+
+ int file_descriptor()
+
+ cdef cppclass ReadableFile(RandomAccessFile):
+ @staticmethod
+ CStatus Open(const c_string& path, shared_ptr[ReadableFile]* file)
+
+ @staticmethod
+ CStatus Open(const c_string& path, CMemoryPool* memory_pool,
+ shared_ptr[ReadableFile]* file)
+
+ int file_descriptor()
+
+ cdef cppclass CMemoryMappedFile" arrow::io::MemoryMappedFile"\
+ (ReadWriteFileInterface):
+
+ @staticmethod
+ CStatus Create(const c_string& path, int64_t size,
+ shared_ptr[CMemoryMappedFile]* file)
+
+ @staticmethod
+ CStatus Open(const c_string& path, FileMode mode,
+ shared_ptr[CMemoryMappedFile]* file)
+
+ int file_descriptor()
+
+
+cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
+ CStatus HaveLibHdfs()
+ CStatus HaveLibHdfs3()
+
+ enum HdfsDriver" arrow::io::HdfsDriver":
+ HdfsDriver_LIBHDFS" arrow::io::HdfsDriver::LIBHDFS"
+ HdfsDriver_LIBHDFS3" arrow::io::HdfsDriver::LIBHDFS3"
+
+ cdef cppclass HdfsConnectionConfig:
+ c_string host
+ int port
+ c_string user
+ c_string kerb_ticket
+ HdfsDriver driver
+
+ cdef cppclass HdfsPathInfo:
+ ObjectType kind;
+ c_string name
+ c_string owner
+ c_string group
+ int32_t last_modified_time
+ int32_t last_access_time
+ int64_t size
+ int16_t replication
+ int64_t block_size
+ int16_t permissions
+
+ cdef cppclass HdfsReadableFile(RandomAccessFile):
+ pass
+
+ cdef cppclass HdfsOutputStream(OutputStream):
+ pass
+
+ cdef cppclass CHdfsClient" arrow::io::HdfsClient":
+ @staticmethod
+ CStatus Connect(const HdfsConnectionConfig* config,
+ shared_ptr[CHdfsClient]* client)
+
+ CStatus CreateDirectory(const c_string& path)
+
+ CStatus Delete(const c_string& path, c_bool recursive)
+
+ CStatus Disconnect()
+
+ c_bool Exists(const c_string& path)
+
+ CStatus GetCapacity(int64_t* nbytes)
+ CStatus GetUsed(int64_t* nbytes)
+
+ CStatus ListDirectory(const c_string& path,
+ vector[HdfsPathInfo]* listing)
+
+ CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info)
+
+ CStatus Rename(const c_string& src, const c_string& dst)
+
+ CStatus OpenReadable(const c_string& path,
+ shared_ptr[HdfsReadableFile]* handle)
+
+ CStatus OpenWriteable(const c_string& path, c_bool append,
+ int32_t buffer_size, int16_t replication,
+ int64_t default_block_size,
+ shared_ptr[HdfsOutputStream]* handle)
+
+
+cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
+ cdef cppclass CBufferReader" arrow::io::BufferReader"\
+ (RandomAccessFile):
+ CBufferReader(const shared_ptr[CBuffer]& buffer)
+ CBufferReader(const uint8_t* data, int64_t nbytes)
+
+ cdef cppclass BufferOutputStream(OutputStream):
+ BufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
+
+
cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
cdef cppclass SchemaMessage:
int num_fields()
@@ -335,3 +491,82 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
shared_ptr[SchemaMessage] GetSchema()
shared_ptr[RecordBatchMessage] GetRecordBatch()
shared_ptr[DictionaryBatchMessage] GetDictionaryBatch()
+
+
+cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
+
+ cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
+ @staticmethod
+ CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+ shared_ptr[CStreamWriter]* out)
+
+ CStatus Close()
+ CStatus WriteRecordBatch(const CRecordBatch& batch)
+
+ cdef cppclass CStreamReader " arrow::ipc::StreamReader":
+
+ @staticmethod
+ CStatus Open(const shared_ptr[InputStream]& stream,
+ shared_ptr[CStreamReader]* out)
+
+ shared_ptr[CSchema] schema()
+
+ CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
+
+ cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
+ @staticmethod
+ CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+ shared_ptr[CFileWriter]* out)
+
+ cdef cppclass CFileReader " arrow::ipc::FileReader":
+
+ @staticmethod
+ CStatus Open(const shared_ptr[RandomAccessFile]& file,
+ shared_ptr[CFileReader]* out)
+
+ @staticmethod
+ CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
+ int64_t footer_offset, shared_ptr[CFileReader]* out)
+
+ shared_ptr[CSchema] schema()
+
+ int num_record_batches()
+
+ CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
+
+ CStatus WriteTensor(const CTensor& tensor, OutputStream* dst,
+ int32_t* metadata_length,
+ int64_t* body_length)
+
+ CStatus ReadTensor(int64_t offset, RandomAccessFile* file,
+ shared_ptr[CTensor]* out)
+
+
+cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
+
+ cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
+ @staticmethod
+ CStatus Open(const shared_ptr[OutputStream]& stream,
+ unique_ptr[CFeatherWriter]* out)
+
+ void SetDescription(const c_string& desc)
+ void SetNumRows(int64_t num_rows)
+
+ CStatus Append(const c_string& name, const CArray& values)
+ CStatus Finalize()
+
+ cdef cppclass CFeatherReader" arrow::ipc::feather::TableReader":
+ @staticmethod
+ CStatus Open(const shared_ptr[RandomAccessFile]& file,
+ unique_ptr[CFeatherReader]* out)
+
+ c_string GetDescription()
+ c_bool HasDescription()
+
+ int64_t num_rows()
+ int64_t num_columns()
+
+ shared_ptr[CSchema] schema()
+
+ CStatus GetColumn(int i, shared_ptr[CColumn]* out)
+ c_string GetColumnName(int i)
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
deleted file mode 100644
index 5992c73..0000000
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ /dev/null
@@ -1,171 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport *
-
-cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
- enum FileMode" arrow::io::FileMode::type":
- FileMode_READ" arrow::io::FileMode::READ"
- FileMode_WRITE" arrow::io::FileMode::WRITE"
- FileMode_READWRITE" arrow::io::FileMode::READWRITE"
-
- enum ObjectType" arrow::io::ObjectType::type":
- ObjectType_FILE" arrow::io::ObjectType::FILE"
- ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY"
-
- cdef cppclass FileInterface:
- CStatus Close()
- CStatus Tell(int64_t* position)
- FileMode mode()
-
- cdef cppclass Readable:
- CStatus ReadB" Read"(int64_t nbytes, shared_ptr[CBuffer]* out)
- CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
-
- cdef cppclass Seekable:
- CStatus Seek(int64_t position)
-
- cdef cppclass Writeable:
- CStatus Write(const uint8_t* data, int64_t nbytes)
-
- cdef cppclass OutputStream(FileInterface, Writeable):
- pass
-
- cdef cppclass InputStream(FileInterface, Readable):
- pass
-
- cdef cppclass RandomAccessFile(InputStream, Seekable):
- CStatus GetSize(int64_t* size)
-
- CStatus ReadAt(int64_t position, int64_t nbytes,
- int64_t* bytes_read, uint8_t* buffer)
- CStatus ReadAt(int64_t position, int64_t nbytes,
- int64_t* bytes_read, shared_ptr[CBuffer]* out)
-
- cdef cppclass WriteableFileInterface(OutputStream, Seekable):
- CStatus WriteAt(int64_t position, const uint8_t* data,
- int64_t nbytes)
-
- cdef cppclass ReadWriteFileInterface(RandomAccessFile,
- WriteableFileInterface):
- pass
-
-
-cdef extern from "arrow/io/file.h" namespace "arrow::io" nogil:
-
-
- cdef cppclass FileOutputStream(OutputStream):
- @staticmethod
- CStatus Open(const c_string& path, shared_ptr[FileOutputStream]* file)
-
- int file_descriptor()
-
- cdef cppclass ReadableFile(RandomAccessFile):
- @staticmethod
- CStatus Open(const c_string& path, shared_ptr[ReadableFile]* file)
-
- @staticmethod
- CStatus Open(const c_string& path, CMemoryPool* memory_pool,
- shared_ptr[ReadableFile]* file)
-
- int file_descriptor()
-
- cdef cppclass CMemoryMappedFile" arrow::io::MemoryMappedFile"\
- (ReadWriteFileInterface):
- @staticmethod
- CStatus Open(const c_string& path, FileMode mode,
- shared_ptr[CMemoryMappedFile]* file)
-
- int file_descriptor()
-
-
-cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
- CStatus HaveLibHdfs()
- CStatus HaveLibHdfs3()
-
- enum HdfsDriver" arrow::io::HdfsDriver":
- HdfsDriver_LIBHDFS" arrow::io::HdfsDriver::LIBHDFS"
- HdfsDriver_LIBHDFS3" arrow::io::HdfsDriver::LIBHDFS3"
-
- cdef cppclass HdfsConnectionConfig:
- c_string host
- int port
- c_string user
- c_string kerb_ticket
- HdfsDriver driver
-
- cdef cppclass HdfsPathInfo:
- ObjectType kind;
- c_string name
- c_string owner
- c_string group
- int32_t last_modified_time
- int32_t last_access_time
- int64_t size
- int16_t replication
- int64_t block_size
- int16_t permissions
-
- cdef cppclass HdfsReadableFile(RandomAccessFile):
- pass
-
- cdef cppclass HdfsOutputStream(OutputStream):
- pass
-
- cdef cppclass CHdfsClient" arrow::io::HdfsClient":
- @staticmethod
- CStatus Connect(const HdfsConnectionConfig* config,
- shared_ptr[CHdfsClient]* client)
-
- CStatus CreateDirectory(const c_string& path)
-
- CStatus Delete(const c_string& path, c_bool recursive)
-
- CStatus Disconnect()
-
- c_bool Exists(const c_string& path)
-
- CStatus GetCapacity(int64_t* nbytes)
- CStatus GetUsed(int64_t* nbytes)
-
- CStatus ListDirectory(const c_string& path,
- vector[HdfsPathInfo]* listing)
-
- CStatus GetPathInfo(const c_string& path, HdfsPathInfo* info)
-
- CStatus Rename(const c_string& src, const c_string& dst)
-
- CStatus OpenReadable(const c_string& path,
- shared_ptr[HdfsReadableFile]* handle)
-
- CStatus OpenWriteable(const c_string& path, c_bool append,
- int32_t buffer_size, int16_t replication,
- int64_t default_block_size,
- shared_ptr[HdfsOutputStream]* handle)
-
-
-cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
- cdef cppclass CBufferReader" arrow::io::BufferReader"\
- (RandomAccessFile):
- CBufferReader(const shared_ptr[CBuffer]& buffer)
- CBufferReader(const uint8_t* data, int64_t nbytes)
-
- cdef cppclass BufferOutputStream(OutputStream):
- BufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
deleted file mode 100644
index 59fd90b..0000000
--- a/python/pyarrow/includes/libarrow_ipc.pxd
+++ /dev/null
@@ -1,94 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-# distutils: language = c++
-
-from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CColumn, CSchema, CRecordBatch)
-from pyarrow.includes.libarrow_io cimport (InputStream, OutputStream,
- RandomAccessFile)
-
-
-cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
-
- cdef cppclass CStreamWriter " arrow::ipc::StreamWriter":
- @staticmethod
- CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
- shared_ptr[CStreamWriter]* out)
-
- CStatus Close()
- CStatus WriteRecordBatch(const CRecordBatch& batch)
-
- cdef cppclass CStreamReader " arrow::ipc::StreamReader":
-
- @staticmethod
- CStatus Open(const shared_ptr[InputStream]& stream,
- shared_ptr[CStreamReader]* out)
-
- shared_ptr[CSchema] schema()
-
- CStatus GetNextRecordBatch(shared_ptr[CRecordBatch]* batch)
-
- cdef cppclass CFileWriter " arrow::ipc::FileWriter"(CStreamWriter):
- @staticmethod
- CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
- shared_ptr[CFileWriter]* out)
-
- cdef cppclass CFileReader " arrow::ipc::FileReader":
-
- @staticmethod
- CStatus Open(const shared_ptr[RandomAccessFile]& file,
- shared_ptr[CFileReader]* out)
-
- @staticmethod
- CStatus Open2" Open"(const shared_ptr[RandomAccessFile]& file,
- int64_t footer_offset, shared_ptr[CFileReader]* out)
-
- shared_ptr[CSchema] schema()
-
- int num_record_batches()
-
- CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
-
-cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil:
-
- cdef cppclass CFeatherWriter" arrow::ipc::feather::TableWriter":
- @staticmethod
- CStatus Open(const shared_ptr[OutputStream]& stream,
- unique_ptr[CFeatherWriter]* out)
-
- void SetDescription(const c_string& desc)
- void SetNumRows(int64_t num_rows)
-
- CStatus Append(const c_string& name, const CArray& values)
- CStatus Finalize()
-
- cdef cppclass CFeatherReader" arrow::ipc::feather::TableReader":
- @staticmethod
- CStatus Open(const shared_ptr[RandomAccessFile]& file,
- unique_ptr[CFeatherReader]* out)
-
- c_string GetDescription()
- c_bool HasDescription()
-
- int64_t num_rows()
- int64_t num_columns()
-
- shared_ptr[CSchema] schema()
-
- CStatus GetColumn(int i, shared_ptr[CColumn]* out)
- c_string GetColumnName(int i)
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 9b64435..c40df3d 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -20,9 +20,9 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn, CDataType,
CTable, CTensor, CStatus, Type,
- CMemoryPool, TimeUnit)
-
-cimport pyarrow.includes.libarrow_io as arrow_io
+ CMemoryPool, TimeUnit,
+ RandomAccessFile, OutputStream,
+ CBufferReader)
cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
@@ -65,11 +65,11 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
cdef cppclass PyBuffer(CBuffer):
PyBuffer(object o)
- cdef cppclass PyReadableFile(arrow_io.RandomAccessFile):
+ cdef cppclass PyReadableFile(RandomAccessFile):
PyReadableFile(object fo)
- cdef cppclass PyOutputStream(arrow_io.OutputStream):
+ cdef cppclass PyOutputStream(OutputStream):
PyOutputStream(object fo)
- cdef cppclass PyBytesReader(arrow_io.CBufferReader):
+ cdef cppclass PyBytesReader(CBufferReader):
PyBytesReader(object fo)
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index cffd29a..0c37a09 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -19,8 +19,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_io cimport (RandomAccessFile,
- OutputStream)
+
cdef class Buffer:
cdef:
@@ -30,6 +29,7 @@ cdef class Buffer:
cdef init(self, const shared_ptr[CBuffer]& buffer)
+
cdef class NativeFile:
cdef:
shared_ptr[RandomAccessFile] rd_file
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 608b20d..98b5a62 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -27,12 +27,10 @@ from cython.operator cimport dereference as deref
from libc.stdlib cimport malloc, free
from pyarrow.includes.libarrow cimport *
-from pyarrow.includes.libarrow_io cimport *
-from pyarrow.includes.libarrow_ipc cimport *
cimport pyarrow.includes.pyarrow as pyarrow
from pyarrow.compat import frombytes, tobytes, encode_file_path
-from pyarrow.array cimport Array
+from pyarrow.array cimport Array, Tensor, box_tensor
from pyarrow.error cimport check_status
from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
from pyarrow.schema cimport Schema
@@ -340,7 +338,32 @@ cdef class MemoryMappedFile(NativeFile):
cdef:
object path
- def __cinit__(self, path, mode='r'):
+ def __cinit__(self):
+ self.is_open = False
+ self.is_readable = 0
+ self.is_writeable = 0
+
+ @staticmethod
+ def create(path, size):
+ cdef:
+ shared_ptr[CMemoryMappedFile] handle
+ c_string c_path = encode_file_path(path)
+ int64_t c_size = size
+
+ with nogil:
+ check_status(CMemoryMappedFile.Create(c_path, c_size, &handle))
+
+ cdef MemoryMappedFile result = MemoryMappedFile()
+ result.path = path
+ result.is_readable = 1
+ result.is_writeable = 1
+ result.wr_file = <shared_ptr[OutputStream]> handle
+ result.rd_file = <shared_ptr[RandomAccessFile]> handle
+ result.is_open = True
+
+ return result
+
+ def open(self, path, mode='r'):
self.path = path
cdef:
@@ -348,8 +371,6 @@ cdef class MemoryMappedFile(NativeFile):
shared_ptr[CMemoryMappedFile] handle
c_string c_path = encode_file_path(path)
- self.is_readable = self.is_writeable = 0
-
if mode in ('r', 'rb'):
c_mode = FileMode_READ
self.is_readable = 1
@@ -370,6 +391,41 @@ cdef class MemoryMappedFile(NativeFile):
self.is_open = True
+def memory_map(path, mode='r'):
+ """
+ Open memory map at file path. Size of the memory map cannot change
+
+ Parameters
+ ----------
+ path : string
+ mode : {'r', 'w'}, default 'r'
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ cdef MemoryMappedFile mmap = MemoryMappedFile()
+ mmap.open(path, mode)
+ return mmap
+
+
+def create_memory_map(path, size):
+ """
+ Create memory map at indicated path of the given size, return open
+ writeable file object
+
+ Parameters
+ ----------
+ path : string
+ size : int
+
+ Returns
+ -------
+ mmap : MemoryMappedFile
+ """
+ return MemoryMappedFile.create(path, size)
+
+
cdef class OSFile(NativeFile):
"""
Supports 'r', 'w' modes
@@ -542,7 +598,7 @@ cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader):
cdef NativeFile nf
if isinstance(source, six.string_types):
- source = MemoryMappedFile(source, mode='r')
+ source = memory_map(source, mode='r')
elif isinstance(source, Buffer):
source = BufferReader(source)
elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
@@ -1144,3 +1200,57 @@ cdef class FeatherReader:
cdef Column col = Column()
col.init(sp_column)
return col
+
+
+def write_tensor(Tensor tensor, NativeFile dest):
+ """
+ Write pyarrow.Tensor to pyarrow.NativeFile object its current position
+
+ Parameters
+ ----------
+ tensor : pyarrow.Tensor
+ dest : pyarrow.NativeFile
+
+ Returns
+ -------
+ bytes_written : int
+ Total number of bytes written to the file
+ """
+ cdef:
+ int32_t metadata_length
+ int64_t body_length
+
+ dest._assert_writeable()
+
+ with nogil:
+ check_status(
+ WriteTensor(deref(tensor.tp), dest.wr_file.get(),
+ &metadata_length, &body_length))
+
+ return metadata_length + body_length
+
+
+def read_tensor(NativeFile source):
+ """
+ Read pyarrow.Tensor from pyarrow.NativeFile object from current
+ position. If the file source supports zero copy (e.g. a memory map), then
+ this operation does not allocate any memory
+
+ Parameters
+ ----------
+ source : pyarrow.NativeFile
+
+ Returns
+ -------
+ tensor : Tensor
+ """
+ cdef:
+ shared_ptr[CTensor] sp_tensor
+
+ source._assert_writeable()
+
+ cdef int64_t offset = source.tell()
+ with nogil:
+ check_status(ReadTensor(offset, source.rd_file.get(), &sp_tensor))
+
+ return box_tensor(sp_tensor)
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 15c5e6b..beb6113 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -246,10 +246,10 @@ def sample_disk_data(request):
return path, data
-def _check_native_file_reader(KLASS, sample_data):
+def _check_native_file_reader(FACTORY, sample_data):
path, data = sample_data
- f = KLASS(path, mode='r')
+ f = FACTORY(path, mode='r')
assert f.read(10) == data[:10]
assert f.read(0) == b''
@@ -269,14 +269,14 @@ def _check_native_file_reader(KLASS, sample_data):
def test_memory_map_reader(sample_disk_data):
- _check_native_file_reader(io.MemoryMappedFile, sample_disk_data)
+ _check_native_file_reader(pa.memory_map, sample_disk_data)
def test_memory_map_retain_buffer_reference(sample_disk_data):
path, data = sample_disk_data
cases = []
- with io.MemoryMappedFile(path, 'rb') as f:
+ with pa.memory_map(path, 'rb') as f:
cases.append((f.read_buffer(100), data[:100]))
cases.append((f.read_buffer(100), data[100:200]))
cases.append((f.read_buffer(100), data[200:300]))
@@ -309,7 +309,7 @@ def test_memory_map_writer():
with open(path, 'wb') as f:
f.write(data)
- f = io.MemoryMappedFile(path, mode='r+w')
+ f = pa.memory_map(path, mode='r+w')
f.seek(10)
f.write('peekaboo')
@@ -318,7 +318,7 @@ def test_memory_map_writer():
f.seek(10)
assert f.read(8) == b'peekaboo'
- f2 = io.MemoryMappedFile(path, mode='r+w')
+ f2 = pa.memory_map(path, mode='r+w')
f2.seek(10)
f2.write(b'booapeak')
@@ -328,10 +328,10 @@ def test_memory_map_writer():
assert f.read(8) == b'booapeak'
# Does not truncate file
- f3 = io.MemoryMappedFile(path, mode='w')
+ f3 = pa.memory_map(path, mode='w')
f3.write('foo')
- with io.MemoryMappedFile(path) as f4:
+ with pa.memory_map(path) as f4:
assert f4.size() == SIZE
with pytest.raises(IOError):
http://git-wip-us.apache.org/repos/asf/arrow/blob/7d1d4e75/python/pyarrow/tests/test_tensor.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_tensor.py b/python/pyarrow/tests/test_tensor.py
new file mode 100644
index 0000000..5327f1a
--- /dev/null
+++ b/python/pyarrow/tests/test_tensor.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+
+import numpy as np
+import pyarrow as pa
+
+
+def test_tensor_attrs():
+ data = np.random.randn(10, 4)
+
+ tensor = pa.Tensor.from_numpy(data)
+
+ assert tensor.ndim == 2
+ assert tensor.size == 40
+ assert tensor.shape == list(data.shape)
+ assert tensor.strides == list(data.strides)
+
+ assert tensor.is_contiguous
+ assert tensor.is_mutable
+
+ # not writeable
+ data2 = data.copy()
+ data2.flags.writeable = False
+ tensor = pa.Tensor.from_numpy(data2)
+ assert not tensor.is_mutable
+
+
+@pytest.mark.parametrize('dtype_str,arrow_type', [
+ ('i1', pa.int8()),
+ ('i2', pa.int16()),
+ ('i4', pa.int32()),
+ ('i8', pa.int64()),
+ ('u1', pa.uint8()),
+ ('u2', pa.uint16()),
+ ('u4', pa.uint32()),
+ ('u8', pa.uint64()),
+ ('f2', pa.float16()),
+ ('f4', pa.float32()),
+ ('f8', pa.float64())
+])
+def test_tensor_numpy_roundtrip(dtype_str, arrow_type):
+ dtype = np.dtype(dtype_str)
+ data = (100 * np.random.randn(10, 4)).astype(dtype)
+
+ tensor = pa.Tensor.from_numpy(data)
+ assert tensor.type == arrow_type
+
+ repr(tensor)
+
+ result = tensor.to_numpy()
+ assert (data == result).all()
+
+
+def _try_delete(path):
+ try:
+ os.remove(path)
+ except os.error:
+ pass
+
+
+def test_tensor_ipc_roundtrip():
+ data = np.random.randn(10, 4)
+ tensor = pa.Tensor.from_numpy(data)
+
+ path = 'pyarrow-tensor-ipc-roundtrip'
+ try:
+ mmap = pa.create_memory_map(path, 1024)
+
+ pa.write_tensor(tensor, mmap)
+
+ mmap.seek(0)
+ result = pa.read_tensor(mmap)
+
+ assert result.equals(tensor)
+ finally:
+ _try_delete(path)