You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ju...@apache.org on 2016/10/11 01:42:38 UTC
[3/3] arrow git commit: ARROW-312: Read and write Arrow IPC file
format from Python
ARROW-312: Read and write Arrow IPC file format from Python
This also adds some IO scaffolding for interacting with `arrow::Buffer` objects from Python and assorted additions to help with testing.
Author: Wes McKinney <we...@twosigma.com>
Closes #164 from wesm/ARROW-312 and squashes the following commits:
7df3e5f [Wes McKinney] Set BUILD_WITH_INSTALL_RPATH on arrow_ipc
be8cee0 [Wes McKinney] Link Cython modules to libarrow* libraries
5716601 [Wes McKinney] Fix accidental deletion
77fb03b [Wes McKinney] Add / test Buffer wrapper. Test that we can write an arrow file to a wrapped buffer. Resize buffer in BufferOutputStream on close
316537d [Wes McKinney] Get ready to wrap Arrow buffers in a Python object
4822d32 [Wes McKinney] Implement RecordBatch::Equals, compare in Python ipc file writes
a931e49 [Wes McKinney] Permit buffers (write padding) in a non-multiple of 64 in an IPC context, to allow zero-copy writing of NumPy arrays
2c49cd4 [Wes McKinney] Some debugging
ca1562b [Wes McKinney] Draft implementations of Arrow file read/write from Python
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a9747cea
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a9747cea
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a9747cea
Branch: refs/heads/master
Commit: a9747ceac2b6399c6acf027de8074d8661d5eb1d
Parents: 17cd7a6
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Oct 10 11:21:49 2016 -0400
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Mon Oct 10 18:42:05 2016 -0700
----------------------------------------------------------------------
cpp/src/arrow/io/io-memory-test.cc | 25 ++
cpp/src/arrow/io/memory.cc | 13 +-
cpp/src/arrow/ipc/CMakeLists.txt | 7 +
cpp/src/arrow/ipc/adapter.cc | 16 +-
cpp/src/arrow/ipc/util.h | 6 +-
cpp/src/arrow/table-test.cc | 27 ++
cpp/src/arrow/table.cc | 16 ++
cpp/src/arrow/table.h | 2 +
cpp/src/arrow/types/primitive-test.cc | 3 +-
cpp/src/arrow/util/bit-util.h | 13 +
cpp/src/arrow/util/buffer.cc | 16 +-
cpp/src/arrow/util/buffer.h | 1 -
cpp/src/arrow/util/logging.h | 4 +-
python/CMakeLists.txt | 8 +-
python/cmake_modules/FindArrow.cmake | 11 +
python/pyarrow/__init__.py | 3 +-
python/pyarrow/array.pyx | 44 +---
python/pyarrow/includes/common.pxd | 4 -
python/pyarrow/includes/libarrow.pxd | 29 ++-
python/pyarrow/includes/libarrow_io.pxd | 14 +-
python/pyarrow/includes/libarrow_ipc.pxd | 52 ++++
python/pyarrow/includes/pyarrow.pxd | 13 +-
python/pyarrow/io.pxd | 6 +
python/pyarrow/io.pyx | 340 ++++++++++++++++----------
python/pyarrow/ipc.pyx | 155 ++++++++++++
python/pyarrow/table.pxd | 17 +-
python/pyarrow/table.pyx | 194 +++++++++++----
python/pyarrow/tests/test_array.py | 4 +
python/pyarrow/tests/test_io.py | 41 ++++
python/pyarrow/tests/test_ipc.py | 116 +++++++++
python/pyarrow/tests/test_table.py | 82 +++----
python/setup.py | 1 +
python/src/pyarrow/adapters/builtin.cc | 2 +-
python/src/pyarrow/adapters/pandas.cc | 8 +
python/src/pyarrow/common.cc | 2 +-
python/src/pyarrow/common.h | 20 +-
python/src/pyarrow/io.cc | 6 +-
37 files changed, 1012 insertions(+), 309 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index 6de35da..a49faf3 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -121,5 +121,30 @@ TEST_F(TestMemoryMappedFile, InvalidFile) {
IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
}
+class TestBufferOutputStream : public ::testing::Test {
+ public:
+ void SetUp() {
+ buffer_.reset(new PoolBuffer(default_memory_pool()));
+ stream_.reset(new BufferOutputStream(buffer_));
+ }
+
+ protected:
+ std::shared_ptr<PoolBuffer> buffer_;
+ std::unique_ptr<OutputStream> stream_;
+};
+
+TEST_F(TestBufferOutputStream, CloseResizes) {
+ std::string data = "data123456";
+
+ const int64_t nbytes = static_cast<int64_t>(data.size());
+ const int K = 100;
+ for (int i = 0; i < K; ++i) {
+ EXPECT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));
+ }
+
+ ASSERT_OK(stream_->Close());
+ ASSERT_EQ(K * nbytes, buffer_->size());
+}
+
} // namespace io
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 7d6e02e..c7d0ae5 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -212,7 +212,11 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
mutable_data_(buffer->mutable_data()) {}
Status BufferOutputStream::Close() {
- return Status::OK();
+ if (position_ < capacity_) {
+ return buffer_->Resize(position_);
+ } else {
+ return Status::OK();
+ }
}
Status BufferOutputStream::Tell(int64_t* position) {
@@ -228,8 +232,11 @@ Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
}
Status BufferOutputStream::Reserve(int64_t nbytes) {
- while (position_ + nbytes > capacity_) {
- int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
+ int64_t new_capacity = capacity_;
+ while (position_ + nbytes > new_capacity) {
+ new_capacity = std::max(kBufferMinimumSize, new_capacity * 2);
+ }
+ if (new_capacity > capacity_) {
RETURN_NOT_OK(buffer_->Resize(new_capacity));
capacity_ = new_capacity;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index bde8c5b..8dcd9ac 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -57,6 +57,13 @@ SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
LINKER_LANGUAGE CXX
LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")
+if (APPLE)
+ set_target_properties(arrow_ipc
+ PROPERTIES
+ BUILD_WITH_INSTALL_RPATH ON
+ INSTALL_NAME_DIR "@rpath")
+endif()
+
ADD_ARROW_TEST(ipc-adapter-test)
ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
${ARROW_IPC_TEST_LINK_LIBS})
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 99974a4..cd8ab53 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -162,15 +162,14 @@ class RecordBatchWriter {
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
int64_t size = 0;
+ int64_t padding = 0;
// The buffer might be null if we are handling zero row lengths.
if (buffer) {
- // We use capacity here, because size might not reflect the padding
- // requirements of buffers but capacity always should.
- size = buffer->capacity();
- // check that padding is appropriate
- RETURN_NOT_OK(CheckMultipleOf64(size));
+ size = buffer->size();
+ padding = util::RoundUpToMultipleOf64(size) - size;
}
+
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
@@ -179,12 +178,17 @@ class RecordBatchWriter {
// are using from any OS-level shared memory. The thought is that systems
// may (in the future) associate integer page id's with physical memory
// pages (according to whatever is the desired shared memory mechanism)
- buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
+ buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));
if (size > 0) {
RETURN_NOT_OK(dst->Write(buffer->data(), size));
position += size;
}
+
+ if (padding > 0) {
+ RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
+ position += padding;
+ }
}
*body_end_offset = position;
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 94079a3..9000d1b 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -29,7 +29,11 @@ namespace ipc {
// Align on 8-byte boundaries
static constexpr int kArrowAlignment = 8;
-static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
+
+// Buffers are padded to 64-byte boundaries (for SIMD)
+static constexpr int kArrowBufferAlignment = 64;
+
+static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};
static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
return ((nbytes + alignment - 1) / alignment) * alignment;
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 385e7d8..743fb66 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -123,4 +123,31 @@ TEST_F(TestTable, InvalidColumns) {
ASSERT_RAISES(Invalid, table_->ValidateColumns());
}
+class TestRecordBatch : public TestBase {};
+
+TEST_F(TestRecordBatch, Equals) {
+ const int length = 10;
+
+ auto f0 = std::make_shared<Field>("f0", INT32);
+ auto f1 = std::make_shared<Field>("f1", UINT8);
+ auto f2 = std::make_shared<Field>("f2", INT16);
+
+ vector<shared_ptr<Field>> fields = {f0, f1, f2};
+ auto schema = std::make_shared<Schema>(fields);
+
+ auto a0 = MakePrimitive<Int32Array>(length);
+ auto a1 = MakePrimitive<UInt8Array>(length);
+ auto a2 = MakePrimitive<Int16Array>(length);
+
+ RecordBatch b1(schema, length, {a0, a1, a2});
+ RecordBatch b2(schema, 5, {a0, a1, a2});
+ RecordBatch b3(schema, length, {a0, a1});
+ RecordBatch b4(schema, length, {a0, a1, a1});
+
+ ASSERT_TRUE(b1.Equals(b1));
+ ASSERT_FALSE(b1.Equals(b2));
+ ASSERT_FALSE(b1.Equals(b3));
+ ASSERT_FALSE(b1.Equals(b4));
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 3a250df..af84f27 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <sstream>
+#include "arrow/array.h"
#include "arrow/column.h"
#include "arrow/schema.h"
#include "arrow/util/status.h"
@@ -35,6 +36,21 @@ const std::string& RecordBatch::column_name(int i) const {
return schema_->field(i)->name;
}
+bool RecordBatch::Equals(const RecordBatch& other) const {
+ if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
+ return false;
+ }
+
+ for (int i = 0; i < num_columns(); ++i) {
+ if (!column(i)->Equals(other.column(i))) { return false; }
+ }
+
+ return true;
+}
+
+// ----------------------------------------------------------------------
+// Table methods
+
Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Column>>& columns)
: name_(name), schema_(schema), columns_(columns) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 36b3c8e..1a856c8 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -43,6 +43,8 @@ class ARROW_EXPORT RecordBatch {
RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows,
const std::vector<std::shared_ptr<Array>>& columns);
+ bool Equals(const RecordBatch& other) const;
+
// @returns: the table's schema
const std::shared_ptr<Schema>& schema() const { return schema_; }
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/types/primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc
index ffebb92..87eb0fe 100644
--- a/cpp/src/arrow/types/primitive-test.cc
+++ b/cpp/src/arrow/types/primitive-test.cc
@@ -238,8 +238,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
}
typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
- PInt32, PInt64, PFloat, PDouble>
- Primitives;
+ PInt32, PInt64, PFloat, PDouble> Primitives;
TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 873a195..3087ce7 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -19,6 +19,7 @@
#define ARROW_UTIL_BIT_UTIL_H
#include <cstdint>
+#include <limits>
#include <memory>
#include <vector>
@@ -77,6 +78,18 @@ static inline bool is_multiple_of_64(int64_t n) {
return (n & 63) == 0;
}
+inline int64_t RoundUpToMultipleOf64(int64_t num) {
+ // TODO(wesm): is this definitely needed?
+ // DCHECK_GE(num, 0);
+ constexpr int64_t round_to = 64;
+ constexpr int64_t force_carry_addend = round_to - 1;
+ constexpr int64_t truncate_bitmask = ~(round_to - 1);
+ constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
+ if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
+ // handle overflow case. This should result in a malloc error upstream
+ return num;
+}
+
void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits);
ARROW_EXPORT Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.cc b/cpp/src/arrow/util/buffer.cc
index 703ef83..6faa048 100644
--- a/cpp/src/arrow/util/buffer.cc
+++ b/cpp/src/arrow/util/buffer.cc
@@ -20,25 +20,13 @@
#include <cstdint>
#include <limits>
+#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"
namespace arrow {
-namespace {
-int64_t RoundUpToMultipleOf64(int64_t num) {
- DCHECK_GE(num, 0);
- constexpr int64_t round_to = 64;
- constexpr int64_t force_carry_addend = round_to - 1;
- constexpr int64_t truncate_bitmask = ~(round_to - 1);
- constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
- if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
- // handle overflow case. This should result in a malloc error upstream
- return num;
-}
-} // namespace
-
Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
data_ = parent->data() + offset;
size_ = size;
@@ -64,7 +52,7 @@ PoolBuffer::~PoolBuffer() {
Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
uint8_t* new_data;
- new_capacity = RoundUpToMultipleOf64(new_capacity);
+ new_capacity = util::RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h
index 1aeebc6..01e4259 100644
--- a/cpp/src/arrow/util/buffer.h
+++ b/cpp/src/arrow/util/buffer.h
@@ -23,7 +23,6 @@
#include <cstring>
#include <memory>
-#include "arrow/util/bit-util.h"
#include "arrow/util/macros.h"
#include "arrow/util/status.h"
#include "arrow/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b22f07d..06ee841 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
class FatalLog : public CerrLog {
public:
explicit FatalLog(int /* severity */) // NOLINT
- : CerrLog(ARROW_FATAL){} // NOLINT
+ : CerrLog(ARROW_FATAL) {} // NOLINT
- [[noreturn]] ~FatalLog() {
+ [[noreturn]] ~FatalLog() {
if (has_logged_) { std::cerr << std::endl; }
std::exit(1);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 77a771a..55f6d05 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -352,6 +352,8 @@ ADD_THIRDPARTY_LIB(arrow
SHARED_LIB ${ARROW_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_io
SHARED_LIB ${ARROW_IO_SHARED_LIB})
+ADD_THIRDPARTY_LIB(arrow_ipc
+ SHARED_LIB ${ARROW_IPC_SHARED_LIB})
############################################################
# Linker setup
@@ -415,6 +417,8 @@ if (UNIX)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
endif()
+SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
+
add_subdirectory(src/pyarrow)
add_subdirectory(src/pyarrow/util)
@@ -423,6 +427,7 @@ set(CYTHON_EXTENSIONS
config
error
io
+ ipc
scalar
schema
table
@@ -442,6 +447,7 @@ set(PYARROW_SRCS
set(LINK_LIBS
arrow
arrow_io
+ arrow_ipc
)
if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
@@ -455,8 +461,6 @@ if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
parquet)
endif()
-SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)
-
add_library(pyarrow SHARED
${PYARROW_SRCS})
target_link_libraries(pyarrow ${LINK_LIBS})
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/cmake_modules/FindArrow.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindArrow.cmake b/python/cmake_modules/FindArrow.cmake
index 9919746..3c359aa 100644
--- a/python/cmake_modules/FindArrow.cmake
+++ b/python/cmake_modules/FindArrow.cmake
@@ -47,10 +47,16 @@ find_library(ARROW_IO_LIB_PATH NAMES arrow_io
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
+find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc
+ PATHS
+ ${ARROW_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+
if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME libarrow)
set(ARROW_IO_LIB_NAME libarrow_io)
+ set(ARROW_IPC_LIB_NAME libarrow_ipc)
set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
@@ -58,9 +64,14 @@ if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
+ set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a)
+ set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+
if (NOT Arrow_FIND_QUIETLY)
message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
+ message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}")
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 7561f6d..8b131aa 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -41,5 +41,4 @@ from pyarrow.schema import (null, bool_,
list_, struct, field,
DataType, Field, Schema, schema)
-from pyarrow.array import RowBatch
-from pyarrow.table import Column, Table, from_pandas_dataframe
+from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index cdbe73a..84ab4a4 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -37,7 +37,7 @@ import pyarrow.schema as schema
def total_allocated_bytes():
- cdef MemoryPool* pool = pyarrow.GetMemoryPool()
+ cdef MemoryPool* pool = pyarrow.get_memory_pool()
return pool.bytes_allocated()
@@ -243,12 +243,14 @@ def from_pandas_series(object series, object mask=None, timestamps_to_ms=False):
series_values = series_values.astype('datetime64[ms]')
if mask is None:
- check_status(pyarrow.PandasToArrow(pyarrow.GetMemoryPool(),
- series_values, &out))
+ with nogil:
+ check_status(pyarrow.PandasToArrow(pyarrow.get_memory_pool(),
+ series_values, &out))
else:
mask = series_as_ndarray(mask)
- check_status(pyarrow.PandasMaskedToArrow(
- pyarrow.GetMemoryPool(), series_values, mask, &out))
+ with nogil:
+ check_status(pyarrow.PandasMaskedToArrow(
+ pyarrow.get_memory_pool(), series_values, mask, &out))
return box_arrow_array(out)
@@ -262,35 +264,3 @@ cdef object series_as_ndarray(object obj):
result = obj
return result
-
-#----------------------------------------------------------------------
-# Table-like data structures
-
-cdef class RowBatch:
- """
-
- """
- cdef readonly:
- Schema schema
- int num_rows
- list arrays
-
- def __cinit__(self, Schema schema, int num_rows, list arrays):
- self.schema = schema
- self.num_rows = num_rows
- self.arrays = arrays
-
- if len(self.schema) != len(arrays):
- raise ValueError('Mismatch number of data arrays and '
- 'schema fields')
-
- def __len__(self):
- return self.num_rows
-
- property num_columns:
-
- def __get__(self):
- return len(self.arrays)
-
- def __getitem__(self, i):
- return self.arrays[i]
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 133797b..05c0123 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -47,7 +47,3 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
c_bool IsKeyError()
c_bool IsNotImplemented()
c_bool IsInvalid()
-
- cdef cppclass Buffer:
- uint8_t* data()
- int64_t size()
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 854d07d..3ae1789 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -54,6 +54,18 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef cppclass MemoryPool" arrow::MemoryPool":
int64_t bytes_allocated()
+ cdef cppclass CBuffer" arrow::Buffer":
+ uint8_t* data()
+ int64_t size()
+
+ cdef cppclass ResizableBuffer(CBuffer):
+ CStatus Resize(int64_t nbytes)
+ CStatus Reserve(int64_t nbytes)
+
+ cdef cppclass PoolBuffer(ResizableBuffer):
+ PoolBuffer()
+ PoolBuffer(MemoryPool*)
+
cdef MemoryPool* default_memory_pool()
cdef cppclass CListType" arrow::ListType"(CDataType):
@@ -149,6 +161,21 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
const shared_ptr[CDataType]& type()
const shared_ptr[CChunkedArray]& data()
+ cdef cppclass CRecordBatch" arrow::RecordBatch":
+ CRecordBatch(const shared_ptr[CSchema]& schema, int32_t num_rows,
+ const vector[shared_ptr[CArray]]& columns)
+
+ c_bool Equals(const CRecordBatch& other)
+
+ const shared_ptr[CSchema]& schema()
+ const shared_ptr[CArray]& column(int i)
+ const c_string& column_name(int i)
+
+ const vector[shared_ptr[CArray]]& columns()
+
+ int num_columns()
+ int32_t num_rows()
+
cdef cppclass CTable" arrow::Table":
CTable(const c_string& name, const shared_ptr[CSchema]& schema,
const vector[shared_ptr[CColumn]]& columns)
@@ -186,7 +213,7 @@ cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
MessageType_DICTIONARY_BATCH" arrow::ipc::Message::DICTIONARY_BATCH"
cdef cppclass Message:
- CStatus Open(const shared_ptr[Buffer]& buf,
+ CStatus Open(const shared_ptr[CBuffer]& buf,
shared_ptr[Message]* out)
int64_t body_length()
MessageType type()
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index 56d8d4c..8074915 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -18,7 +18,7 @@
# distutils: language = c++
from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport MemoryPool
+from pyarrow.includes.libarrow cimport *
cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
enum FileMode" arrow::io::FileMode::type":
@@ -36,7 +36,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
FileMode mode()
cdef cppclass Readable:
- CStatus ReadB" Read"(int64_t nbytes, shared_ptr[Buffer]* out)
+ CStatus ReadB" Read"(int64_t nbytes, shared_ptr[CBuffer]* out)
CStatus Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out)
cdef cppclass Seekable:
@@ -57,7 +57,7 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
CStatus ReadAt(int64_t position, int64_t nbytes,
int64_t* bytes_read, uint8_t* buffer)
CStatus ReadAt(int64_t position, int64_t nbytes,
- int64_t* bytes_read, shared_ptr[Buffer]* out)
+ int64_t* bytes_read, shared_ptr[CBuffer]* out)
cdef cppclass WriteableFileInterface(OutputStream, Seekable):
CStatus WriteAt(int64_t position, const uint8_t* data,
@@ -143,9 +143,9 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
- cdef cppclass BufferReader(ReadableFileInterface):
- BufferReader(const uint8_t* data, int64_t nbytes)
+ cdef cppclass CBufferReader" arrow::io::BufferReader"\
+ (ReadableFileInterface):
+ CBufferReader(const uint8_t* data, int64_t nbytes)
cdef cppclass BufferOutputStream(OutputStream):
- # TODO(wesm)
- pass
+ BufferOutputStream(const shared_ptr[ResizableBuffer]& buffer)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/libarrow_ipc.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_ipc.pxd b/python/pyarrow/includes/libarrow_ipc.pxd
new file mode 100644
index 0000000..eda5b9b
--- /dev/null
+++ b/python/pyarrow/includes/libarrow_ipc.pxd
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport (MemoryPool, CArray, CSchema,
+ CRecordBatch)
+from pyarrow.includes.libarrow_io cimport (OutputStream, ReadableFileInterface)
+
+cdef extern from "arrow/ipc/file.h" namespace "arrow::ipc" nogil:
+
+ cdef cppclass CFileWriter " arrow::ipc::FileWriter":
+ @staticmethod
+ CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema,
+ shared_ptr[CFileWriter]* out)
+
+ CStatus WriteRecordBatch(const vector[shared_ptr[CArray]]& columns,
+ int32_t num_rows)
+
+ CStatus Close()
+
+ cdef cppclass CFileReader " arrow::ipc::FileReader":
+
+ @staticmethod
+ CStatus Open(const shared_ptr[ReadableFileInterface]& file,
+ shared_ptr[CFileReader]* out)
+
+ @staticmethod
+ CStatus Open2" Open"(const shared_ptr[ReadableFileInterface]& file,
+ int64_t footer_offset, shared_ptr[CFileReader]* out)
+
+ const shared_ptr[CSchema]& schema()
+
+ int num_dictionaries()
+ int num_record_batches()
+
+ CStatus GetRecordBatch(int i, shared_ptr[CRecordBatch]* batch)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index 4c97166..2fa5a7d 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -18,8 +18,8 @@
# distutils: language = c++
from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport (CArray, CColumn, CDataType, CStatus,
- Type, MemoryPool)
+from pyarrow.includes.libarrow cimport (CArray, CBuffer, CColumn,
+ CDataType, CStatus, Type, MemoryPool)
cimport pyarrow.includes.libarrow_io as arrow_io
@@ -53,7 +53,12 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
PyObject** out)
- MemoryPool* GetMemoryPool()
+ MemoryPool* get_memory_pool()
+
+
+cdef extern from "pyarrow/common.h" namespace "pyarrow" nogil:
+ cdef cppclass PyBytesBuffer(CBuffer):
+ PyBytesBuffer(object o)
cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil:
@@ -63,5 +68,5 @@ cdef extern from "pyarrow/io.h" namespace "pyarrow" nogil:
cdef cppclass PyOutputStream(arrow_io.OutputStream):
PyOutputStream(object fo)
- cdef cppclass PyBytesReader(arrow_io.BufferReader):
+ cdef cppclass PyBytesReader(arrow_io.CBufferReader):
PyBytesReader(object fo)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
index 1dbb3fd..d6966cd 100644
--- a/python/pyarrow/io.pxd
+++ b/python/pyarrow/io.pxd
@@ -22,6 +22,11 @@ from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_io cimport (ReadableFileInterface,
OutputStream)
+cdef class Buffer:
+ cdef:
+ shared_ptr[CBuffer] buffer
+
+ cdef init(self, const shared_ptr[CBuffer]& buffer)
cdef class NativeFile:
cdef:
@@ -29,6 +34,7 @@ cdef class NativeFile:
shared_ptr[OutputStream] wr_file
bint is_readonly
bint is_open
+ bint own_file
# By implementing these "virtual" functions (all functions in Cython
# extension classes are technically virtual in the C++ sense) we can expose
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index e6e2b62..00a492f 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -36,6 +36,217 @@ import re
import sys
import threading
+
+cdef class NativeFile:
+
+ def __cinit__(self):
+ self.is_open = False
+ self.own_file = False
+
+ def __dealloc__(self):
+ if self.is_open and self.own_file:
+ self.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, tb):
+ self.close()
+
+ def close(self):
+ if self.is_open:
+ with nogil:
+ if self.is_readonly:
+ check_cstatus(self.rd_file.get().Close())
+ else:
+ check_cstatus(self.wr_file.get().Close())
+ self.is_open = False
+
+ cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
+ self._assert_readable()
+ file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file
+
+ cdef write_handle(self, shared_ptr[OutputStream]* file):
+ self._assert_writeable()
+ file[0] = <shared_ptr[OutputStream]> self.wr_file
+
+ def _assert_readable(self):
+ if not self.is_readonly:
+ raise IOError("only valid on readonly files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def _assert_writeable(self):
+ if self.is_readonly:
+ raise IOError("only valid on writeonly files")
+
+ if not self.is_open:
+ raise IOError("file not open")
+
+ def size(self):
+ cdef int64_t size
+ self._assert_readable()
+ with nogil:
+ check_cstatus(self.rd_file.get().GetSize(&size))
+ return size
+
+ def tell(self):
+ cdef int64_t position
+ with nogil:
+ if self.is_readonly:
+ check_cstatus(self.rd_file.get().Tell(&position))
+ else:
+ check_cstatus(self.wr_file.get().Tell(&position))
+ return position
+
+ def seek(self, int64_t position):
+ self._assert_readable()
+ with nogil:
+ check_cstatus(self.rd_file.get().Seek(position))
+
+ def write(self, data):
+ """
+ Write bytes-like (unicode, encoded to UTF-8) to file
+ """
+ self._assert_writeable()
+
+ data = tobytes(data)
+
+ cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
+ cdef int64_t bufsize = len(data)
+ with nogil:
+ check_cstatus(self.wr_file.get().Write(buf, bufsize))
+
+ def read(self, int nbytes):
+ cdef:
+ int64_t bytes_read = 0
+ uint8_t* buf
+ shared_ptr[CBuffer] out
+
+ self._assert_readable()
+
+ with nogil:
+ check_cstatus(self.rd_file.get()
+ .ReadB(nbytes, &out))
+
+ result = cp.PyBytes_FromStringAndSize(
+ <const char*>out.get().data(), out.get().size())
+
+ return result
+
+
+# ----------------------------------------------------------------------
+# Python file-like objects
+
+cdef class PythonFileInterface(NativeFile):
+ cdef:
+ object handle
+
+ def __cinit__(self, handle, mode='w'):
+ self.handle = handle
+
+ if mode.startswith('w'):
+ self.wr_file.reset(new pyarrow.PyOutputStream(handle))
+ self.is_readonly = 0
+ elif mode.startswith('r'):
+ self.rd_file.reset(new pyarrow.PyReadableFile(handle))
+ self.is_readonly = 1
+ else:
+ raise ValueError('Invalid file mode: {0}'.format(mode))
+
+ self.is_open = True
+
+
+cdef class BytesReader(NativeFile):
+ cdef:
+ object obj
+
+ def __cinit__(self, obj):
+ if not isinstance(obj, bytes):
+ raise ValueError('Must pass bytes object')
+
+ self.obj = obj
+ self.is_readonly = 1
+ self.is_open = True
+
+ self.rd_file.reset(new pyarrow.PyBytesReader(obj))
+
+# ----------------------------------------------------------------------
+# Arrow buffers
+
+
+cdef class Buffer:
+
+ def __cinit__(self):
+ pass
+
+ cdef init(self, const shared_ptr[CBuffer]& buffer):
+ self.buffer = buffer
+
+ def __len__(self):
+ return self.size
+
+ property size:
+
+ def __get__(self):
+ return self.buffer.get().size()
+
+ def __getitem__(self, key):
+ # TODO(wesm): buffer slicing
+ raise NotImplementedError
+
+ def to_pybytes(self):
+ return cp.PyBytes_FromStringAndSize(
+ <const char*>self.buffer.get().data(),
+ self.buffer.get().size())
+
+
+cdef shared_ptr[PoolBuffer] allocate_buffer():
+ cdef shared_ptr[PoolBuffer] result
+ result.reset(new PoolBuffer(pyarrow.get_memory_pool()))
+ return result
+
+
+cdef class InMemoryOutputStream(NativeFile):
+
+ cdef:
+ shared_ptr[PoolBuffer] buffer
+
+ def __cinit__(self):
+ self.buffer = allocate_buffer()
+ self.wr_file.reset(new BufferOutputStream(
+ <shared_ptr[ResizableBuffer]> self.buffer))
+ self.is_readonly = 0
+ self.is_open = True
+
+ def get_result(self):
+ cdef Buffer result = Buffer()
+
+ check_cstatus(self.wr_file.get().Close())
+ result.init(<shared_ptr[CBuffer]> self.buffer)
+
+ self.is_open = False
+ return result
+
+
+def buffer_from_bytes(object obj):
+ """
+ Construct an Arrow buffer from a Python bytes object
+ """
+ if not isinstance(obj, bytes):
+ raise ValueError('Must pass bytes object')
+
+ cdef shared_ptr[CBuffer] buf
+ buf.reset(new pyarrow.PyBytesBuffer(obj))
+
+ cdef Buffer result = Buffer()
+ result.init(buf)
+ return result
+
+# ----------------------------------------------------------------------
+# HDFS IO implementation
+
_HDFS_PATH_RE = re.compile('hdfs://(.*):(\d+)(.*)')
try:
@@ -274,6 +485,7 @@ cdef class HdfsClient:
out.buffer_size = c_buffer_size
out.parent = self
out.is_open = True
+ out.own_file = True
return out
@@ -322,134 +534,6 @@ cdef class HdfsClient:
f.download(stream)
-cdef class NativeFile:
-
- def __cinit__(self):
- self.is_open = False
-
- def __dealloc__(self):
- if self.is_open:
- self.close()
-
- def __enter__(self):
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- self.close()
-
- def close(self):
- if self.is_open:
- with nogil:
- if self.is_readonly:
- check_cstatus(self.rd_file.get().Close())
- else:
- check_cstatus(self.wr_file.get().Close())
- self.is_open = False
-
- cdef read_handle(self, shared_ptr[ReadableFileInterface]* file):
- self._assert_readable()
- file[0] = <shared_ptr[ReadableFileInterface]> self.rd_file
-
- cdef write_handle(self, shared_ptr[OutputStream]* file):
- self._assert_writeable()
- file[0] = <shared_ptr[OutputStream]> self.wr_file
-
- def _assert_readable(self):
- if not self.is_readonly:
- raise IOError("only valid on readonly files")
-
- def _assert_writeable(self):
- if self.is_readonly:
- raise IOError("only valid on writeonly files")
-
- def size(self):
- cdef int64_t size
- self._assert_readable()
- with nogil:
- check_cstatus(self.rd_file.get().GetSize(&size))
- return size
-
- def tell(self):
- cdef int64_t position
- with nogil:
- if self.is_readonly:
- check_cstatus(self.rd_file.get().Tell(&position))
- else:
- check_cstatus(self.wr_file.get().Tell(&position))
- return position
-
- def seek(self, int64_t position):
- self._assert_readable()
- with nogil:
- check_cstatus(self.rd_file.get().Seek(position))
-
- def write(self, data):
- """
- Write bytes-like (unicode, encoded to UTF-8) to file
- """
- self._assert_writeable()
-
- data = tobytes(data)
-
- cdef const uint8_t* buf = <const uint8_t*> cp.PyBytes_AS_STRING(data)
- cdef int64_t bufsize = len(data)
- with nogil:
- check_cstatus(self.wr_file.get().Write(buf, bufsize))
-
- def read(self, int nbytes):
- cdef:
- int64_t bytes_read = 0
- uint8_t* buf
- shared_ptr[Buffer] out
-
- self._assert_readable()
-
- with nogil:
- check_cstatus(self.rd_file.get()
- .ReadB(nbytes, &out))
-
- result = cp.PyBytes_FromStringAndSize(
- <const char*>out.get().data(), out.get().size())
-
- return result
-
-
-# ----------------------------------------------------------------------
-# Python file-like objects
-
-cdef class PythonFileInterface(NativeFile):
- cdef:
- object handle
-
- def __cinit__(self, handle, mode='w'):
- self.handle = handle
-
- if mode.startswith('w'):
- self.wr_file.reset(new pyarrow.PyOutputStream(handle))
- self.is_readonly = 0
- elif mode.startswith('r'):
- self.rd_file.reset(new pyarrow.PyReadableFile(handle))
- self.is_readonly = 1
- else:
- raise ValueError('Invalid file mode: {0}'.format(mode))
-
- self.is_open = True
-
-
-cdef class BytesReader(NativeFile):
- cdef:
- object obj
-
- def __cinit__(self, obj):
- if not isinstance(obj, bytes):
- raise ValueError('Must pass bytes object')
-
- self.obj = obj
- self.is_readonly = 1
- self.is_open = True
-
- self.rd_file.reset(new pyarrow.PyBytesReader(obj))
-
# ----------------------------------------------------------------------
# Specialization for HDFS
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/ipc.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/ipc.pyx b/python/pyarrow/ipc.pyx
new file mode 100644
index 0000000..f8da3a7
--- /dev/null
+++ b/python/pyarrow/ipc.pyx
@@ -0,0 +1,155 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cython wrappers for arrow::ipc
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_io cimport *
+from pyarrow.includes.libarrow_ipc cimport *
+cimport pyarrow.includes.pyarrow as pyarrow
+
+from pyarrow.error cimport check_cstatus
+from pyarrow.io cimport NativeFile
+from pyarrow.schema cimport Schema
+from pyarrow.table cimport RecordBatch
+
+from pyarrow.compat import frombytes, tobytes
+import pyarrow.io as io
+
+cimport cpython as cp
+
+
+cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader):
+ cdef NativeFile nf
+
+ if isinstance(source, bytes):
+ source = io.BytesReader(source)
+ elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'):
+ # Optimistically hope this is file-like
+ source = io.PythonFileInterface(source, mode='r')
+
+ if isinstance(source, NativeFile):
+ nf = source
+
+ # TODO: what about read-write sources (e.g. memory maps)
+ if not nf.is_readonly:
+ raise IOError('Native file is not readable')
+
+ nf.read_handle(reader)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+
+cdef get_writer(source, shared_ptr[OutputStream]* writer):
+ cdef NativeFile nf
+
+ if not isinstance(source, io.NativeFile) and hasattr(source, 'write'):
+ # Optimistically hope this is file-like
+ source = io.PythonFileInterface(source, mode='w')
+
+ if isinstance(source, io.NativeFile):
+ nf = source
+
+ if nf.is_readonly:
+ raise IOError('Native file is not writeable')
+
+ nf.write_handle(writer)
+ else:
+ raise TypeError('Unable to read from object of type: {0}'
+ .format(type(source)))
+
+
+cdef class ArrowFileWriter:
+ cdef:
+ shared_ptr[CFileWriter] writer
+ shared_ptr[OutputStream] sink
+ bint closed
+
+ def __cinit__(self, sink, Schema schema):
+ self.closed = True
+ get_writer(sink, &self.sink)
+
+ with nogil:
+ check_cstatus(CFileWriter.Open(self.sink.get(), schema.sp_schema,
+ &self.writer))
+
+ self.closed = False
+
+ def __dealloc__(self):
+ if not self.closed:
+ self.close()
+
+ def write_record_batch(self, RecordBatch batch):
+ cdef CRecordBatch* bptr = batch.batch
+ with nogil:
+ check_cstatus(self.writer.get()
+ .WriteRecordBatch(bptr.columns(), bptr.num_rows()))
+
+ def close(self):
+ with nogil:
+ check_cstatus(self.writer.get().Close())
+ self.closed = True
+
+
+cdef class ArrowFileReader:
+ cdef:
+ shared_ptr[CFileReader] reader
+
+ def __cinit__(self, source, footer_offset=None):
+ cdef shared_ptr[ReadableFileInterface] reader
+ get_reader(source, &reader)
+
+ cdef int64_t offset = 0
+ if footer_offset is not None:
+ offset = footer_offset
+
+ with nogil:
+ if offset != 0:
+ check_cstatus(CFileReader.Open2(reader, offset, &self.reader))
+ else:
+ check_cstatus(CFileReader.Open(reader, &self.reader))
+
+ property num_dictionaries:
+
+ def __get__(self):
+ return self.reader.get().num_dictionaries()
+
+ property num_record_batches:
+
+ def __get__(self):
+ return self.reader.get().num_record_batches()
+
+ def get_record_batch(self, int i):
+ cdef:
+ shared_ptr[CRecordBatch] batch
+ RecordBatch result
+
+ if i < 0 or i >= self.num_record_batches:
+ raise ValueError('Batch number {0} out of range'.format(i))
+
+ with nogil:
+ check_cstatus(self.reader.get().GetRecordBatch(i, &batch))
+
+ result = RecordBatch()
+ result.init(batch)
+
+ return result
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pxd b/python/pyarrow/table.pxd
index 0a5c122..79c9ae3 100644
--- a/python/pyarrow/table.pxd
+++ b/python/pyarrow/table.pxd
@@ -16,7 +16,10 @@
# under the License.
from pyarrow.includes.common cimport shared_ptr
-from pyarrow.includes.libarrow cimport CChunkedArray, CColumn, CTable
+from pyarrow.includes.libarrow cimport (CChunkedArray, CColumn, CTable,
+ CRecordBatch)
+
+from pyarrow.schema cimport Schema
cdef class ChunkedArray:
@@ -41,6 +44,16 @@ cdef class Table:
cdef:
shared_ptr[CTable] sp_table
CTable* table
-
+
cdef init(self, const shared_ptr[CTable]& table)
cdef _check_nullptr(self)
+
+
+cdef class RecordBatch:
+ cdef:
+ shared_ptr[CRecordBatch] sp_batch
+ CRecordBatch* batch
+ Schema _schema
+
+ cdef init(self, const shared_ptr[CRecordBatch]& table)
+ cdef _check_nullptr(self)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index ade82aa..a1cadcd 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -19,6 +19,8 @@
# distutils: language = c++
# cython: embedsignature = True
+from cython.operator cimport dereference as deref
+
from pyarrow.includes.libarrow cimport *
cimport pyarrow.includes.pyarrow as pyarrow
@@ -45,8 +47,8 @@ cdef class ChunkedArray:
cdef _check_nullptr(self):
if self.chunked_array == NULL:
- raise ReferenceError("ChunkedArray object references a NULL pointer."
- "Not initialized.")
+ raise ReferenceError("ChunkedArray object references a NULL "
+ "pointer. Not initialized.")
def length(self):
self._check_nullptr()
@@ -144,6 +146,130 @@ cdef class Column:
return chunked_array
+cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
+ cdef:
+ Array arr
+ c_string c_name
+ vector[shared_ptr[CField]] fields
+
+ cdef int K = len(arrays)
+
+ fields.resize(K)
+ for i in range(K):
+ arr = arrays[i]
+ c_name = tobytes(names[i])
+ fields[i].reset(new CField(c_name, arr.type.sp_type, True))
+
+ schema.reset(new CSchema(fields))
+
+
+
+cdef _dataframe_to_arrays(df, name, timestamps_to_ms):
+ from pyarrow.array import from_pandas_series
+
+ cdef:
+ list names = []
+ list arrays = []
+
+ for name in df.columns:
+ col = df[name]
+ arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
+
+ names.append(name)
+ arrays.append(arr)
+
+ return names, arrays
+
+
+cdef class RecordBatch:
+
+ def __cinit__(self):
+ self.batch = NULL
+ self._schema = None
+
+ cdef init(self, const shared_ptr[CRecordBatch]& batch):
+ self.sp_batch = batch
+ self.batch = batch.get()
+
+ cdef _check_nullptr(self):
+ if self.batch == NULL:
+ raise ReferenceError("Object not initialized")
+
+ def __len__(self):
+ self._check_nullptr()
+ return self.batch.num_rows()
+
+ property num_columns:
+
+ def __get__(self):
+ self._check_nullptr()
+ return self.batch.num_columns()
+
+ property num_rows:
+
+ def __get__(self):
+ return len(self)
+
+ property schema:
+
+ def __get__(self):
+ cdef Schema schema
+ self._check_nullptr()
+ if self._schema is None:
+ schema = Schema()
+ schema.init_schema(self.batch.schema())
+ self._schema = schema
+
+ return self._schema
+
+ def __getitem__(self, i):
+ cdef Array arr = Array()
+ arr.init(self.batch.column(i))
+ return arr
+
+ def equals(self, RecordBatch other):
+ self._check_nullptr()
+ other._check_nullptr()
+
+ return self.batch.Equals(deref(other.batch))
+
+ @classmethod
+ def from_pandas(cls, df):
+ """
+ Convert pandas.DataFrame to an Arrow RecordBatch
+ """
+ names, arrays = _dataframe_to_arrays(df, None, False)
+ return cls.from_arrays(names, arrays)
+
+ @staticmethod
+ def from_arrays(names, arrays):
+ cdef:
+ Array arr
+ RecordBatch result
+ c_string c_name
+ shared_ptr[CSchema] schema
+ shared_ptr[CRecordBatch] batch
+ vector[shared_ptr[CArray]] c_arrays
+ int32_t num_rows
+
+ if len(arrays) == 0:
+ raise ValueError('Record batch cannot contain no arrays (for now)')
+
+ num_rows = len(arrays[0])
+ _schema_from_arrays(arrays, names, &schema)
+
+ for i in range(len(arrays)):
+ arr = arrays[i]
+ c_arrays.push_back(arr.sp_array)
+
+ batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
+
+ result = RecordBatch()
+ result.init(batch)
+
+ return result
+
+
cdef class Table:
'''
Do not call this class's constructor directly.
@@ -161,38 +287,50 @@ cdef class Table:
raise ReferenceError("Table object references a NULL pointer."
"Not initialized.")
- @staticmethod
- def from_pandas(df, name=None):
- return from_pandas_dataframe(df, name=name)
+ @classmethod
+ def from_pandas(cls, df, name=None, timestamps_to_ms=False):
+ """
+ Convert pandas.DataFrame to an Arrow Table
+
+ Parameters
+ ----------
+ df: pandas.DataFrame
+
+ name: str
+
+ timestamps_to_ms: bool
+ Convert datetime columns to ms resolution. This is needed for
+ compability with other functionality like Parquet I/O which
+ only supports milliseconds.
+ """
+ names, arrays = _dataframe_to_arrays(df, name=name,
+ timestamps_to_ms=timestamps_to_ms)
+ return cls.from_arrays(names, arrays, name=name)
@staticmethod
def from_arrays(names, arrays, name=None):
cdef:
Array arr
- Table result
c_string c_name
vector[shared_ptr[CField]] fields
vector[shared_ptr[CColumn]] columns
+ Table result
shared_ptr[CSchema] schema
shared_ptr[CTable] table
- cdef int K = len(arrays)
+ _schema_from_arrays(arrays, names, &schema)
- fields.resize(K)
+ cdef int K = len(arrays)
columns.resize(K)
for i in range(K):
arr = arrays[i]
- c_name = tobytes(names[i])
-
- fields[i].reset(new CField(c_name, arr.type.sp_type, True))
- columns[i].reset(new CColumn(fields[i], arr.sp_array))
+ columns[i].reset(new CColumn(schema.get().field(i), arr.sp_array))
if name is None:
c_name = ''
else:
c_name = tobytes(name)
- schema.reset(new CSchema(fields))
table.reset(new CTable(c_name, schema, columns))
result = Table()
@@ -268,32 +406,4 @@ cdef class Table:
-def from_pandas_dataframe(object df, name=None, timestamps_to_ms=False):
- """
- Convert pandas.DataFrame to an Arrow Table
-
- Parameters
- ----------
- df: pandas.DataFrame
-
- name: str
-
- timestamps_to_ms: bool
- Convert datetime columns to ms resolution. This is needed for
- compability with other functionality like Parquet I/O which
- only supports milliseconds.
- """
- from pyarrow.array import from_pandas_series
-
- cdef:
- list names = []
- list arrays = []
-
- for name in df.columns:
- col = df[name]
- arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms)
-
- names.append(name)
- arrays.append(arr)
-
- return Table.from_arrays(names, arrays, name=name)
+from_pandas_dataframe = Table.from_pandas
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_array.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index 86147f8..0a17f69 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -19,6 +19,10 @@ import pyarrow
import pyarrow.formatting as fmt
+def test_total_bytes_allocated():
+ assert pyarrow.total_allocated_bytes() == 0
+
+
def test_repr_on_pre_init_array():
arr = pyarrow.array.Array()
assert len(repr(arr)) > 0
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_io.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_io.py b/python/pyarrow/tests/test_io.py
index 9a41ebe..211a12b 100644
--- a/python/pyarrow/tests/test_io.py
+++ b/python/pyarrow/tests/test_io.py
@@ -98,3 +98,44 @@ def test_bytes_reader():
def test_bytes_reader_non_bytes():
with pytest.raises(ValueError):
io.BytesReader(u('some sample data'))
+
+
+
+# ----------------------------------------------------------------------
+# Buffers
+
+
+def test_buffer_bytes():
+ val = b'some data'
+
+ buf = io.buffer_from_bytes(val)
+ assert isinstance(buf, io.Buffer)
+
+ result = buf.to_pybytes()
+
+ assert result == val
+
+
+def test_memory_output_stream():
+ # 10 bytes
+ val = b'dataabcdef'
+
+ f = io.InMemoryOutputStream()
+
+ K = 1000
+ for i in range(K):
+ f.write(val)
+
+ buf = f.get_result()
+
+ assert len(buf) == len(val) * K
+ assert buf.to_pybytes() == val * K
+
+
+def test_inmemory_write_after_closed():
+ f = io.InMemoryOutputStream()
+ f.write(b'ok')
+ f.get_result()
+
+ with pytest.raises(IOError):
+ f.write(b'not ok')
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_ipc.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py
new file mode 100644
index 0000000..b9e9e6e
--- /dev/null
+++ b/python/pyarrow/tests/test_ipc.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+
+import numpy as np
+import pandas as pd
+
+import pyarrow as A
+import pyarrow.io as arrow_io
+import pyarrow.ipc as ipc
+
+
+class RoundtripTest(object):
+ # Also tests writing zero-copy NumPy array with additional padding
+
+ def __init__(self):
+ self.sink = self._get_sink()
+
+ def _get_sink(self):
+ return io.BytesIO()
+
+ def _get_source(self):
+ return self.sink.getvalue()
+
+ def run(self):
+ nrows = 5
+ df = pd.DataFrame({
+ 'one': np.random.randn(nrows),
+ 'two': ['foo', np.nan, 'bar', 'bazbaz', 'qux']})
+
+ batch = A.RecordBatch.from_pandas(df)
+ writer = ipc.ArrowFileWriter(self.sink, batch.schema)
+
+ num_batches = 5
+ frames = []
+ batches = []
+ for i in range(num_batches):
+ unique_df = df.copy()
+ unique_df['one'] = np.random.randn(nrows)
+
+ batch = A.RecordBatch.from_pandas(unique_df)
+ writer.write_record_batch(batch)
+ frames.append(unique_df)
+ batches.append(batch)
+
+ writer.close()
+
+ file_contents = self._get_source()
+ reader = ipc.ArrowFileReader(file_contents)
+
+ assert reader.num_record_batches == num_batches
+
+ for i in range(num_batches):
+ # it works. Must convert back to DataFrame
+ batch = reader.get_record_batch(i)
+ assert batches[i].equals(batch)
+
+
+class InMemoryStreamTest(RoundtripTest):
+
+ def _get_sink(self):
+ return arrow_io.InMemoryOutputStream()
+
+ def _get_source(self):
+ return self.sink.get_result()
+
+
+def test_ipc_file_simple_roundtrip():
+ helper = RoundtripTest()
+ helper.run()
+
+
+# XXX: For benchmarking
+
+def big_batch():
+ df = pd.DataFrame(
+ np.random.randn(2**4, 2**20).T,
+ columns=[str(i) for i in range(2**4)]
+ )
+
+ df = pd.concat([df] * 2 ** 3, ignore_index=True)
+
+ return A.RecordBatch.from_pandas(df)
+
+
+def write_to_memory(batch):
+ sink = io.BytesIO()
+ write_file(batch, sink)
+ return sink.getvalue()
+
+
+def write_file(batch, sink):
+ writer = ipc.ArrowFileWriter(sink, batch.schema)
+ writer.write_record_batch(batch)
+ writer.close()
+
+
+def read_file(source):
+ reader = ipc.ArrowFileReader(source)
+ return [reader.get_record_batch(i)
+ for i in range(reader.num_record_batches)]
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index abf1431..c513032 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -15,60 +15,52 @@
# specific language governing permissions and limitations
# under the License.
-from pyarrow.compat import unittest
import pyarrow as A
-class TestRowBatch(unittest.TestCase):
+def test_recordbatch_basics():
+ data = [
+ A.from_pylist(range(5)),
+ A.from_pylist([-10, -5, 0, 5, 10])
+ ]
- def test_basics(self):
- data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
- ]
- num_rows = 5
+ batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
- descr = A.schema([A.field('c0', data[0].type),
- A.field('c1', data[1].type)])
+ assert len(batch) == 5
+ assert batch.num_rows == 5
+ assert batch.num_columns == len(data)
- batch = A.RowBatch(descr, num_rows, data)
- assert len(batch) == num_rows
- assert batch.num_rows == num_rows
- assert batch.num_columns == len(data)
+def test_table_basics():
+ data = [
+ A.from_pylist(range(5)),
+ A.from_pylist([-10, -5, 0, 5, 10])
+ ]
+ table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
+ assert table.name == 'table_name'
+ assert len(table) == 5
+ assert table.num_rows == 5
+ assert table.num_columns == 2
+ assert table.shape == (5, 2)
+ for col in table.itercolumns():
+ for chunk in col.data.iterchunks():
+ assert chunk is not None
-class TestTable(unittest.TestCase):
- def test_basics(self):
- data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
- ]
- table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
- assert table.name == 'table_name'
- assert len(table) == 5
- assert table.num_rows == 5
- assert table.num_columns == 2
- assert table.shape == (5, 2)
+def test_table_pandas():
+ data = [
+ A.from_pylist(range(5)),
+ A.from_pylist([-10, -5, 0, 5, 10])
+ ]
+ table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
- for col in table.itercolumns():
- for chunk in col.data.iterchunks():
- assert chunk is not None
+ # TODO: Use this part once from_pandas is implemented
+ # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
+ # df = pd.DataFrame(data)
+ # A.Table.from_pandas(df)
- def test_pandas(self):
- data = [
- A.from_pylist(range(5)),
- A.from_pylist([-10, -5, 0, 5, 10])
- ]
- table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
-
- # TODO: Use this part once from_pandas is implemented
- # data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
- # df = pd.DataFrame(data)
- # A.Table.from_pandas(df)
-
- df = table.to_pandas()
- assert set(df.columns) == set(('a', 'b'))
- assert df.shape == (5, 2)
- assert df.ix[0, 'b'] == -10
+ df = table.to_pandas()
+ assert set(df.columns) == set(('a', 'b'))
+ assert df.shape == (5, 2)
+ assert df.loc[0, 'b'] == -10
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index d1be122..d040ea7 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -102,6 +102,7 @@ class build_ext(_build_ext):
'config',
'error',
'io',
+ 'ipc',
'parquet',
'scalar',
'schema',
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/builtin.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc
index 78ef1b3..680f3a5 100644
--- a/python/src/pyarrow/adapters/builtin.cc
+++ b/python/src/pyarrow/adapters/builtin.cc
@@ -426,7 +426,7 @@ Status ConvertPySequence(PyObject* obj, std::shared_ptr<arrow::Array>* out) {
// Give the sequence converter an array builder
std::shared_ptr<ArrayBuilder> builder;
- RETURN_ARROW_NOT_OK(arrow::MakeBuilder(GetMemoryPool(), type, &builder));
+ RETURN_ARROW_NOT_OK(arrow::MakeBuilder(get_memory_pool(), type, &builder));
converter->Init(builder);
PY_RETURN_NOT_OK(converter->AppendData(obj));
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index d224074..ae24b7e 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -602,6 +602,8 @@ class ArrowDeserializer {
}
Status AllocateOutput(int type) {
+ PyAcquireGIL lock;
+
npy_intp dims[1] = {col_->length()};
out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNew(1, dims, type));
@@ -616,6 +618,8 @@ class ArrowDeserializer {
}
Status OutputFromData(int type, void* data) {
+ PyAcquireGIL lock;
+
// Zero-Copy. We can pass the data pointer directly to NumPy.
Py_INCREF(py_ref_);
OwnedRef py_ref(py_ref_);
@@ -706,6 +710,8 @@ class ArrowDeserializer {
inline typename std::enable_if<
arrow_traits<T2>::is_boolean, Status>::type
ConvertValues(const std::shared_ptr<Array>& arr) {
+ PyAcquireGIL lock;
+
arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
if (arr->null_count() > 0) {
@@ -743,6 +749,8 @@ class ArrowDeserializer {
inline typename std::enable_if<
T2 == arrow::Type::STRING, Status>::type
ConvertValues(const std::shared_ptr<Array>& arr) {
+ PyAcquireGIL lock;
+
RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc
index 82b14fd..09f3efb 100644
--- a/python/src/pyarrow/common.cc
+++ b/python/src/pyarrow/common.cc
@@ -63,7 +63,7 @@ class PyArrowMemoryPool : public arrow::MemoryPool {
int64_t bytes_allocated_;
};
-arrow::MemoryPool* GetMemoryPool() {
+arrow::MemoryPool* get_memory_pool() {
static PyArrowMemoryPool memory_pool;
return &memory_pool;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index bc599f8..96eed16 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -109,7 +109,8 @@ class PyGILGuard {
return Status::UnknownError(message); \
}
-PYARROW_EXPORT arrow::MemoryPool* GetMemoryPool();
+// Return the common PyArrow memory pool
+PYARROW_EXPORT arrow::MemoryPool* get_memory_pool();
class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
public:
@@ -120,6 +121,7 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
size_ = PyArray_SIZE(arr_);
+ capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
}
virtual ~NumPyBuffer() {
@@ -139,6 +141,22 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer {
PyObject* obj_;
};
+
+class PyAcquireGIL {
+ public:
+ PyAcquireGIL() {
+ state_ = PyGILState_Ensure();
+ }
+
+ ~PyAcquireGIL() {
+ PyGILState_Release(state_);
+ }
+
+ private:
+ PyGILState_STATE state_;
+ DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL);
+};
+
} // namespace pyarrow
#endif // PYARROW_COMMON_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a9747cea/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 35054e9..9879b34 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -47,9 +47,9 @@ static arrow::Status CheckPyError() {
PyErr_Fetch(&exc_type, &exc_value, &traceback);
PyObjectStringify stringified(exc_value);
std::string message(stringified.bytes);
- Py_DECREF(exc_type);
- Py_DECREF(exc_value);
- Py_DECREF(traceback);
+ Py_XDECREF(exc_type);
+ Py_XDECREF(exc_value);
+ Py_XDECREF(traceback);
PyErr_Clear();
return arrow::Status::IOError(message);
}