You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/08 15:50:38 UTC
arrow git commit: ARROW-438: [C++/Python] Implement zero-data-copy
record batch and table concatenation.
Repository: arrow
Updated Branches:
refs/heads/master 1094d89d4 -> 6526a522d
ARROW-438: [C++/Python] Implement zero-data-copy record batch and table concatenation.
This also fixes a bug in ChunkedArray::Equals. This is caught by the Python test suite but would benefit from more C++ unit tests.
Author: Wes McKinney <we...@twosigma.com>
Closes #274 from wesm/ARROW-438 and squashes the following commits:
1f39568 [Wes McKinney] py3 compatibility
2e76c5e [Wes McKinney] Implement arrow::ConcatenateTables and Python wrapper. Fix bug in ChunkedArray::Equals
f3cb170 [Wes McKinney] Fix Cython compilation, verify pyarrow.Table.from_batches still works
af28755 [Wes McKinney] Implement Table::FromRecordBatches
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6526a522
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6526a522
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6526a522
Branch: refs/heads/master
Commit: 6526a522d05e703f2f75fcdad067c8aed7bb6047
Parents: 1094d89
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun Jan 8 10:50:30 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jan 8 10:50:30 2017 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 2 +-
cpp/src/arrow/column.cc | 11 +-
cpp/src/arrow/column.h | 2 +
cpp/src/arrow/io/io-file-test.cc | 1 -
cpp/src/arrow/table-test.cc | 88 ++++++++++++--
cpp/src/arrow/table.cc | 71 +++++++++++
cpp/src/arrow/table.h | 13 +-
cpp/src/arrow/test-util.h | 43 ++++---
python/CMakeLists.txt | 3 +
python/benchmarks/array.py | 7 +-
python/doc/pandas.rst | 5 +-
python/pyarrow/__init__.py | 2 +-
python/pyarrow/array.pyx | 25 ++++
python/pyarrow/includes/libarrow.pxd | 16 +++
python/pyarrow/table.pyx | 147 ++++++++++++++++-------
python/pyarrow/tests/test_convert_pandas.py | 6 +-
python/pyarrow/tests/test_parquet.py | 12 +-
python/pyarrow/tests/test_table.py | 27 +++++
python/setup.py | 5 +-
19 files changed, 395 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 3522e5c..87b7841 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -76,7 +76,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(ARROW_JEMALLOC
"Build the Arrow jemalloc-based allocator"
- ON)
+ OFF)
option(ARROW_BOOST_USE_SHARED
"Rely on boost shared libraries where relevant"
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/column.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc
index 3e89956..9cc0f57 100644
--- a/cpp/src/arrow/column.cc
+++ b/cpp/src/arrow/column.cc
@@ -45,7 +45,9 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
int32_t this_start_idx = 0;
int other_chunk_idx = 0;
int32_t other_start_idx = 0;
- while (this_chunk_idx < static_cast<int32_t>(chunks_.size())) {
+
+ int64_t elements_compared = 0;
+ while (elements_compared < length_) {
const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx];
const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx);
int32_t common_length = std::min(
@@ -55,14 +57,21 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
return false;
}
+ elements_compared += common_length;
+
// If we have exhausted the current chunk, proceed to the next one individually.
if (this_start_idx + common_length == this_array->length()) {
this_chunk_idx++;
this_start_idx = 0;
+ } else {
+ this_start_idx += common_length;
}
+
if (other_start_idx + common_length == other_array->length()) {
other_chunk_idx++;
other_start_idx = 0;
+ } else {
+ other_start_idx += common_length;
}
}
return true;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index f716473..a28b266 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -48,6 +48,8 @@ class ARROW_EXPORT ChunkedArray {
std::shared_ptr<Array> chunk(int i) const { return chunks_[i]; }
+ const ArrayVector& chunks() const { return chunks_; }
+
bool Equals(const ChunkedArray& other) const;
bool Equals(const std::shared_ptr<ChunkedArray>& other) const;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 378b60e..821e71d 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -301,7 +301,6 @@ class MyMemoryPool : public MemoryPool {
return Status::OutOfMemory(ss.str());
}
-
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 734b941..67c9f67 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -44,16 +44,20 @@ class TestTable : public TestBase {
vector<shared_ptr<Field>> fields = {f0, f1, f2};
schema_ = std::make_shared<Schema>(fields);
- columns_ = {
- std::make_shared<Column>(schema_->field(0), MakePrimitive<Int32Array>(length)),
- std::make_shared<Column>(schema_->field(1), MakePrimitive<UInt8Array>(length)),
- std::make_shared<Column>(schema_->field(2), MakePrimitive<Int16Array>(length))};
+ arrays_ = {MakePrimitive<Int32Array>(length), MakePrimitive<UInt8Array>(length),
+ MakePrimitive<Int16Array>(length)};
+
+ columns_ = {std::make_shared<Column>(schema_->field(0), arrays_[0]),
+ std::make_shared<Column>(schema_->field(1), arrays_[1]),
+ std::make_shared<Column>(schema_->field(2), arrays_[2])};
}
protected:
std::shared_ptr<Table> table_;
shared_ptr<Schema> schema_;
- vector<std::shared_ptr<Column>> columns_;
+
+ std::vector<std::shared_ptr<Array>> arrays_;
+ std::vector<std::shared_ptr<Column>> columns_;
};
TEST_F(TestTable, EmptySchema) {
@@ -65,7 +69,7 @@ TEST_F(TestTable, EmptySchema) {
}
TEST_F(TestTable, Ctors) {
- int length = 100;
+ const int length = 100;
MakeExample1(length);
std::string name = "data";
@@ -83,7 +87,7 @@ TEST_F(TestTable, Ctors) {
}
TEST_F(TestTable, Metadata) {
- int length = 100;
+ const int length = 100;
MakeExample1(length);
std::string name = "data";
@@ -98,7 +102,7 @@ TEST_F(TestTable, Metadata) {
TEST_F(TestTable, InvalidColumns) {
// Check that columns are all the same length
- int length = 100;
+ const int length = 100;
MakeExample1(length);
table_.reset(new Table("data", schema_, columns_, length - 1));
@@ -120,7 +124,7 @@ TEST_F(TestTable, InvalidColumns) {
}
TEST_F(TestTable, Equals) {
- int length = 100;
+ const int length = 100;
MakeExample1(length);
std::string name = "data";
@@ -145,6 +149,72 @@ TEST_F(TestTable, Equals) {
ASSERT_FALSE(table_->Equals(std::make_shared<Table>(name, schema_, other_columns)));
}
+TEST_F(TestTable, FromRecordBatches) {
+ const int32_t length = 10;
+ MakeExample1(length);
+
+ auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+ std::shared_ptr<Table> result, expected;
+ ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &result));
+
+ expected = std::make_shared<Table>("foo", schema_, columns_);
+ ASSERT_TRUE(result->Equals(expected));
+
+ std::vector<std::shared_ptr<Column>> other_columns;
+ for (int i = 0; i < schema_->num_fields(); ++i) {
+ std::vector<std::shared_ptr<Array>> col_arrays = {arrays_[i], arrays_[i]};
+ other_columns.push_back(std::make_shared<Column>(schema_->field(i), col_arrays));
+ }
+
+ ASSERT_OK(Table::FromRecordBatches("foo", {batch1, batch1}, &result));
+ expected = std::make_shared<Table>("foo", schema_, other_columns);
+ ASSERT_TRUE(result->Equals(expected));
+
+ // Error states
+ std::vector<std::shared_ptr<RecordBatch>> empty_batches;
+ ASSERT_RAISES(Invalid, Table::FromRecordBatches("", empty_batches, &result));
+
+ std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)};
+ auto other_schema = std::make_shared<Schema>(fields);
+
+ std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
+ auto batch2 = std::make_shared<RecordBatch>(other_schema, length, other_arrays);
+ ASSERT_RAISES(Invalid, Table::FromRecordBatches("", {batch1, batch2}, &result));
+}
+
+TEST_F(TestTable, ConcatenateTables) {
+ const int32_t length = 10;
+
+ MakeExample1(length);
+ auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+ // generate different data
+ MakeExample1(length);
+ auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+ std::shared_ptr<Table> t1, t2, t3, result, expected;
+ ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &t1));
+ ASSERT_OK(Table::FromRecordBatches("foo", {batch2}, &t2));
+
+ ASSERT_OK(ConcatenateTables("bar", {t1, t2}, &result));
+ ASSERT_OK(Table::FromRecordBatches("bar", {batch1, batch2}, &expected));
+ ASSERT_TRUE(result->Equals(expected));
+
+ // Error states
+ std::vector<std::shared_ptr<Table>> empty_tables;
+ ASSERT_RAISES(Invalid, ConcatenateTables("", empty_tables, &result));
+
+ std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)};
+ auto other_schema = std::make_shared<Schema>(fields);
+
+ std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
+ auto batch3 = std::make_shared<RecordBatch>(other_schema, length, other_arrays);
+ ASSERT_OK(Table::FromRecordBatches("", {batch3}, &t3));
+
+ ASSERT_RAISES(Invalid, ConcatenateTables("foo", {t1, t3}, &result));
+}
+
class TestRecordBatch : public TestBase {};
TEST_F(TestRecordBatch, Equals) {
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 45f672e..b3563ea 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -77,6 +77,77 @@ Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows)
: name_(name), schema_(schema), columns_(columns), num_rows_(num_rows) {}
+Status Table::FromRecordBatches(const std::string& name,
+ const std::vector<std::shared_ptr<RecordBatch>>& batches,
+ std::shared_ptr<Table>* table) {
+ if (batches.size() == 0) {
+ return Status::Invalid("Must pass at least one record batch");
+ }
+
+ std::shared_ptr<Schema> schema = batches[0]->schema();
+
+ const int nbatches = static_cast<int>(batches.size());
+ const int ncolumns = static_cast<int>(schema->num_fields());
+
+ for (int i = 1; i < nbatches; ++i) {
+ if (!batches[i]->schema()->Equals(schema)) {
+ std::stringstream ss;
+ ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
+ << schema->ToString() << "\nvs\n"
+ << batches[i]->schema()->ToString();
+ return Status::Invalid(ss.str());
+ }
+ }
+
+ std::vector<std::shared_ptr<Column>> columns(ncolumns);
+ std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
+
+ for (int i = 0; i < ncolumns; ++i) {
+ for (int j = 0; j < nbatches; ++j) {
+ column_arrays[j] = batches[j]->column(i);
+ }
+ columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
+ }
+
+ *table = std::make_shared<Table>(name, schema, columns);
+ return Status::OK();
+}
+
+Status ConcatenateTables(const std::string& output_name,
+ const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table) {
+ if (tables.size() == 0) { return Status::Invalid("Must pass at least one table"); }
+
+ std::shared_ptr<Schema> schema = tables[0]->schema();
+
+ const int ntables = static_cast<int>(tables.size());
+ const int ncolumns = static_cast<int>(schema->num_fields());
+
+ for (int i = 1; i < ntables; ++i) {
+ if (!tables[i]->schema()->Equals(schema)) {
+ std::stringstream ss;
+ ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
+ << schema->ToString() << "\nvs\n"
+ << tables[i]->schema()->ToString();
+ return Status::Invalid(ss.str());
+ }
+ }
+
+ std::vector<std::shared_ptr<Column>> columns(ncolumns);
+ for (int i = 0; i < ncolumns; ++i) {
+ std::vector<std::shared_ptr<Array>> column_arrays;
+ for (int j = 0; j < ntables; ++j) {
+ const std::vector<std::shared_ptr<Array>>& chunks =
+ tables[j]->column(i)->data()->chunks();
+ for (const auto& chunk : chunks) {
+ column_arrays.push_back(chunk);
+ }
+ }
+ columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
+ }
+ *table = std::make_shared<Table>(output_name, schema, columns);
+ return Status::OK();
+}
+
bool Table::Equals(const Table& other) const {
if (name_ != other.name()) { return false; }
if (!schema_->Equals(other.schema())) { return false; }
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 0f2418d..583847c 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -82,7 +82,13 @@ class ARROW_EXPORT Table {
// same length as num_rows -- you can validate this using
// Table::ValidateColumns
Table(const std::string& name, const std::shared_ptr<Schema>& schema,
- const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows);
+ const std::vector<std::shared_ptr<Column>>& columns, int64_t nubm_rows);
+
+ // Construct table from RecordBatch, but only if all of the batch schemas are
+ // equal. Returns Status::Invalid if there is some problem
+ static Status FromRecordBatches(const std::string& name,
+ const std::vector<std::shared_ptr<RecordBatch>>& batches,
+ std::shared_ptr<Table>* table);
// @returns: the table's name, if any (may be length 0)
const std::string& name() const { return name_; }
@@ -116,6 +122,11 @@ class ARROW_EXPORT Table {
int64_t num_rows_;
};
+// Construct table from multiple input tables. Return Status::Invalid if
+// schemas are not equal
+Status ARROW_EXPORT ConcatenateTables(const std::string& output_name,
+ const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table);
+
} // namespace arrow
#endif // ARROW_TABLE_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index f2da824..b59809d 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -71,23 +71,6 @@
namespace arrow {
-class TestBase : public ::testing::Test {
- public:
- void SetUp() { pool_ = default_memory_pool(); }
-
- template <typename ArrayType>
- std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) {
- auto data = std::make_shared<PoolBuffer>(pool_);
- auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
- EXPECT_OK(data->Resize(length * sizeof(typename ArrayType::value_type)));
- EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
- return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
- }
-
- protected:
- MemoryPool* pool_;
-};
-
namespace test {
template <typename T>
@@ -253,6 +236,32 @@ Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool,
} // namespace test
+class TestBase : public ::testing::Test {
+ public:
+ void SetUp() {
+ pool_ = default_memory_pool();
+ random_seed_ = 0;
+ }
+
+ template <typename ArrayType>
+ std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) {
+ auto data = std::make_shared<PoolBuffer>(pool_);
+ const int64_t data_nbytes = length * sizeof(typename ArrayType::value_type);
+ EXPECT_OK(data->Resize(data_nbytes));
+
+ // Fill with random data
+ test::random_bytes(data_nbytes, random_seed_++, data->mutable_data());
+
+ auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
+ EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
+ return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
+ }
+
+ protected:
+ uint32_t random_seed_;
+ MemoryPool* pool_;
+};
+
template <typename TYPE, typename C_TYPE>
void ArrayFromVector(const std::shared_ptr<DataType>& type,
const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 6c24772..e42c45d 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -70,6 +70,9 @@ include(SetupCxxFlags)
# Add common flags
set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
+# Suppress Cython warnings
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable")
+
# Determine compiler version
include(CompilerInfo)
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/benchmarks/array.py
----------------------------------------------------------------------
diff --git a/python/benchmarks/array.py b/python/benchmarks/array.py
index 4268f00..e22c0f7 100644
--- a/python/benchmarks/array.py
+++ b/python/benchmarks/array.py
@@ -49,10 +49,10 @@ class PandasConversionsToArrow(PandasConversionsBase):
params = ((1, 10 ** 5, 10 ** 6, 10 ** 7), ('int64', 'float64', 'float64_nans', 'str'))
def time_from_series(self, n, dtype):
- A.from_pandas_dataframe(self.data)
+ A.Table.from_pandas(self.data)
def peakmem_from_series(self, n, dtype):
- A.from_pandas_dataframe(self.data)
+ A.Table.from_pandas(self.data)
class PandasConversionsFromArrow(PandasConversionsBase):
@@ -61,7 +61,7 @@ class PandasConversionsFromArrow(PandasConversionsBase):
def setup(self, n, dtype):
super(PandasConversionsFromArrow, self).setup(n, dtype)
- self.arrow_data = A.from_pandas_dataframe(self.data)
+ self.arrow_data = A.Table.from_pandas(self.data)
def time_to_series(self, n, dtype):
self.arrow_data.to_pandas()
@@ -80,4 +80,3 @@ class ScalarAccess(object):
def time_as_py(self, n):
for i in range(n):
self._array[i].as_py()
-
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/doc/pandas.rst
----------------------------------------------------------------------
diff --git a/python/doc/pandas.rst b/python/doc/pandas.rst
index 7c70074..c225d13 100644
--- a/python/doc/pandas.rst
+++ b/python/doc/pandas.rst
@@ -31,7 +31,7 @@ represent more data than a DataFrame, so a full conversion is not always possibl
Conversion from a Table to a DataFrame is done by calling
:meth:`pyarrow.table.Table.to_pandas`. The inverse is then achieved by using
-:meth:`pyarrow.from_pandas_dataframe`. This conversion routine provides the
+:meth:`pyarrow.Table.from_pandas`. This conversion routine provides the
convience parameter ``timestamps_to_ms``. Although Arrow supports timestamps of
different resolutions, Pandas only supports nanosecond timestamps and most
other systems (e.g. Parquet) only work on millisecond timestamps. This parameter
@@ -45,7 +45,7 @@ conversion.
df = pd.DataFrame({"a": [1, 2, 3]})
# Convert from Pandas to Arrow
- table = pa.from_pandas_dataframe(df)
+ table = pa.Table.from_pandas(df)
# Convert back to Pandas
df_new = table.to_pandas()
@@ -111,4 +111,3 @@ Arrow -> Pandas Conversion
+-------------------------------------+--------------------------------------------------------+
| ``TIMESTAMP(unit=*)`` | ``pd.Timestamp`` (``np.datetime64[ns]``) |
+-------------------------------------+--------------------------------------------------------+
-
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 02b2b06..d25cdd4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -56,4 +56,4 @@ from pyarrow.schema import (null, bool_,
list_, struct, field,
DataType, Field, Schema, schema)
-from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
+from pyarrow.table import Column, RecordBatch, Table, concat_tables
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index c178d5c..266768f 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -91,6 +91,29 @@ cdef class Array:
"""
return from_pandas_series(obj, mask)
+ @staticmethod
+ def from_list(object list_obj, DataType type=None):
+ """
+ Convert Python list to Arrow array
+
+ Parameters
+ ----------
+ list_obj : array_like
+
+ Returns
+ -------
+ pyarrow.array.Array
+ """
+ cdef:
+ shared_ptr[CArray] sp_array
+
+ if type is None:
+ check_status(pyarrow.ConvertPySequence(list_obj, &sp_array))
+ else:
+ raise NotImplementedError()
+
+ return box_arrow_array(sp_array)
+
property null_count:
def __get__(self):
@@ -348,3 +371,5 @@ cdef object series_as_ndarray(object obj):
result = obj
return result
+
+from_pylist = Array.from_list
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3cdfe49..b0f971d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -182,6 +182,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CColumn(const shared_ptr[CField]& field,
const vector[shared_ptr[CArray]]& chunks)
+ c_bool Equals(const CColumn& other)
+ c_bool Equals(const shared_ptr[CColumn]& other)
+
int64_t length()
int64_t null_count()
const c_string& name()
@@ -207,14 +210,27 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CTable(const c_string& name, const shared_ptr[CSchema]& schema,
const vector[shared_ptr[CColumn]]& columns)
+ @staticmethod
+ CStatus FromRecordBatches(
+ const c_string& name,
+ const vector[shared_ptr[CRecordBatch]]& batches,
+ shared_ptr[CTable]* table)
+
int num_columns()
int num_rows()
+ c_bool Equals(const CTable& other)
+ c_bool Equals(const shared_ptr[CTable]& other)
+
const c_string& name()
shared_ptr[CSchema] schema()
shared_ptr[CColumn] column(int i)
+ CStatus ConcatenateTables(const c_string& output_name,
+ const vector[shared_ptr[CTable]]& tables,
+ shared_ptr[CTable]* result)
+
cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
cdef cppclass SchemaMessage:
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 9255431..3a04651 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -155,6 +155,31 @@ cdef class Column:
return pd.Series(PyObject_to_object(arr), name=self.name)
+ def equals(self, Column other):
+ """
+ Check if contents of two columns are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Column
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CColumn* my_col = self.column
+ CColumn* other_col = other.column
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_col.Equals(deref(other_col))
+
+ return result
+
def to_pylist(self):
"""
Convert to a list of native Python objects.
@@ -343,10 +368,18 @@ cdef class RecordBatch:
return arr
def equals(self, RecordBatch other):
+ cdef:
+ CRecordBatch* my_batch = self.batch
+ CRecordBatch* other_batch = other.batch
+ c_bool result
+
self._check_nullptr()
other._check_nullptr()
- return self.batch.Equals(deref(other.batch))
+ with nogil:
+ result = my_batch.Equals(deref(other_batch))
+
+ return result
def to_pydict(self):
"""
@@ -424,7 +457,6 @@ cdef class RecordBatch:
"""
cdef:
Array arr
- RecordBatch result
c_string c_name
shared_ptr[CSchema] schema
shared_ptr[CRecordBatch] batch
@@ -442,11 +474,7 @@ cdef class RecordBatch:
c_arrays.push_back(arr.sp_array)
batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
-
- result = RecordBatch()
- result.init(batch)
-
- return result
+ return batch_from_cbatch(batch)
cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
@@ -498,6 +526,31 @@ cdef class Table:
raise ReferenceError("Table object references a NULL pointer."
"Not initialized.")
+ def equals(self, Table other):
+ """
+ Check if contents of two tables are equal
+
+ Parameters
+ ----------
+ other : pyarrow.Table
+
+ Returns
+ -------
+ are_equal : boolean
+ """
+ cdef:
+ CTable* my_table = self.table
+ CTable* other_table = other.table
+ c_bool result
+
+ self._check_nullptr()
+ other._check_nullptr()
+
+ with nogil:
+ result = my_table.Equals(deref(other_table))
+
+ return result
+
@classmethod
def from_pandas(cls, df, name=None, timestamps_to_ms=False):
"""
@@ -527,7 +580,7 @@ cdef class Table:
... 'int': [1, 2],
... 'str': ['a', 'b']
... })
- >>> pa.table.from_pandas_dataframe(df)
+ >>> pa.Table.from_pandas(df)
<pyarrow.table.Table object at 0x7f05d1fb1b40>
"""
names, arrays = _dataframe_to_arrays(df, name=name,
@@ -559,7 +612,6 @@ cdef class Table:
c_string c_name
vector[shared_ptr[CField]] fields
vector[shared_ptr[CColumn]] columns
- Table result
shared_ptr[CSchema] schema
shared_ptr[CTable] table
@@ -577,14 +629,10 @@ cdef class Table:
c_name = tobytes(name)
table.reset(new CTable(c_name, schema, columns))
-
- result = Table()
- result.init(table)
-
- return result
+ return table_from_ctable(table)
@staticmethod
- def from_batches(batches):
+ def from_batches(batches, name=None):
"""
Construct a Table from a list of Arrow RecordBatches
@@ -594,39 +642,21 @@ cdef class Table:
batches: list of RecordBatch
RecordBatch list to be converted, schemas must be equal
"""
-
cdef:
- vector[shared_ptr[CArray]] c_array_chunks
- vector[shared_ptr[CColumn]] c_columns
+ vector[shared_ptr[CRecordBatch]] c_batches
shared_ptr[CTable] c_table
- Array arr
- Schema schema
-
- import pandas as pd
+ RecordBatch batch
+ Table table
+ c_string c_name
- schema = batches[0].schema
+ c_name = b'' if name is None else tobytes(name)
- # check schemas are equal
- for other in batches[1:]:
- if not schema.equals(other.schema):
- raise ArrowException("Error converting list of RecordBatches "
- "to DataFrame, not all schemas are equal: {%s} != {%s}"
- % (str(schema), str(other.schema)))
+ for batch in batches:
+ c_batches.push_back(batch.sp_batch)
- cdef int K = batches[0].num_columns
+ with nogil:
+ check_status(CTable.FromRecordBatches(c_name, c_batches, &c_table))
- # create chunked columns from the batches
- c_columns.resize(K)
- for i in range(K):
- for batch in batches:
- arr = batch[i]
- c_array_chunks.push_back(arr.sp_array)
- c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i),
- c_array_chunks))
- c_array_chunks.clear()
-
- # create a Table from columns and convert to DataFrame
- c_table.reset(new CTable('', schema.sp_schema, c_columns))
table = Table()
table.init(c_table)
return table
@@ -760,9 +790,40 @@ cdef class Table:
return (self.num_rows, self.num_columns)
+def concat_tables(tables, output_name=None):
+ """
+ Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
+ if all of the Table schemas are not the same
+
+ Parameters
+ ----------
+ tables : iterable of pyarrow.Table objects
+ output_name : string, default None
+ A name for the output table, if any
+ """
+ cdef:
+ vector[shared_ptr[CTable]] c_tables
+ shared_ptr[CTable] c_result
+ Table table
+ c_string c_name
+
+ c_name = b'' if output_name is None else tobytes(output_name)
+
+ for table in tables:
+ c_tables.push_back(table.sp_table)
+
+ with nogil:
+ check_status(ConcatenateTables(c_name, c_tables, &c_result))
+
+ return table_from_ctable(c_result)
+
+
cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
cdef Table table = Table()
table.init(ctable)
return table
-from_pandas_dataframe = Table.from_pandas
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
+ cdef RecordBatch batch = RecordBatch()
+ batch.init(cbatch)
+ return batch
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index bb9f0b3..12e7a08 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -61,7 +61,7 @@ class TestPandasConversion(unittest.TestCase):
def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
timestamps_to_ms=False):
- table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms)
+ table = A.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms)
result = table.to_pandas(nthreads=nthreads)
if expected is None:
expected = df
@@ -193,7 +193,7 @@ class TestPandasConversion(unittest.TestCase):
values = [u('qux'), b'foo', None, 'bar', 'qux', np.nan]
df = pd.DataFrame({'strings': values})
- table = A.from_pandas_dataframe(df)
+ table = A.Table.from_pandas(df)
assert table[0].type == A.binary()
values2 = [b'qux', b'foo', None, b'bar', b'qux', np.nan]
@@ -245,7 +245,7 @@ class TestPandasConversion(unittest.TestCase):
None,
datetime.date(1970, 1, 1),
datetime.date(2040, 2, 26)]})
- table = A.from_pandas_dataframe(df)
+ table = A.Table.from_pandas(df)
result = table.to_pandas()
expected = df.copy()
expected['date'] = pd.to_datetime(df['date'])
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 7c45732..0fb913c 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -79,7 +79,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
'empty_str': [''] * size
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.from_pandas_dataframe(df, timestamps_to_ms=True)
+ arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True)
A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
@@ -107,7 +107,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
'empty_str': [''] * size
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.from_pandas_dataframe(df)
+ arrow_table = A.Table.from_pandas(df)
A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
table_read = pq.read_table(filename.strpath)
df_read = table_read.to_pandas()
@@ -126,7 +126,7 @@ def test_pandas_column_selection(tmpdir):
'uint16': np.arange(size, dtype=np.uint16)
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.from_pandas_dataframe(df)
+ arrow_table = A.Table.from_pandas(df)
A.parquet.write_table(arrow_table, filename.strpath)
table_read = pq.read_table(filename.strpath, columns=['uint8'])
df_read = table_read.to_pandas()
@@ -155,7 +155,7 @@ def _test_dataframe(size=10000):
@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
df = _test_dataframe(10000)
- arrow_table = A.from_pandas_dataframe(df)
+ arrow_table = A.Table.from_pandas(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
buf = imos.get_result()
@@ -176,7 +176,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
'strings': ['foo', 'bar', None, 'baz', 'qux']
})
- arrow_table = A.from_pandas_dataframe(df)
+ arrow_table = A.Table.from_pandas(df)
with open(filename, 'wb') as f:
A.parquet.write_table(arrow_table, f, version="1.0")
@@ -206,7 +206,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
'bool': np.random.randn(size) > 0
})
filename = tmpdir.join('pandas_rountrip.parquet')
- arrow_table = A.from_pandas_dataframe(df)
+ arrow_table = A.Table.from_pandas(df)
for use_dictionary in [True, False]:
A.parquet.write_table(
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 9985b3e..6f00c73 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -111,6 +111,33 @@ def test_table_basics():
assert chunk is not None
+def test_concat_tables():
+ data = [
+ list(range(5)),
+ [-10., -5., 0., 5., 10.]
+ ]
+ data2 = [
+ list(range(5, 10)),
+ [1., 2., 3., 4., 5.]
+ ]
+
+ t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
+ for x in data], 'table_name')
+ t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
+ for x in data2], 'table_name')
+
+ result = pa.concat_tables([t1, t2], output_name='foo')
+ assert result.name == 'foo'
+ assert len(result) == 10
+
+ expected = pa.Table.from_arrays(
+ ('a', 'b'), [pa.from_pylist(x + y)
+ for x, y in zip(data, data2)],
+ 'foo')
+
+ assert result.equals(expected)
+
+
def test_table_pandas():
data = [
pa.from_pylist(range(5)),
http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 2e595e2..3829a79 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -143,7 +143,10 @@ class build_ext(_build_ext):
cmake_options + [source])
self.spawn(cmake_command)
- args = ['make', 'VERBOSE=1']
+ args = ['make']
+ if os.environ.get('PYARROW_BUILD_VERBOSE', '0') == '1':
+ args.append('VERBOSE=1')
+
if 'PYARROW_PARALLEL' in os.environ:
args.append('-j{0}'.format(os.environ['PYARROW_PARALLEL']))
self.spawn(args)