You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/08 15:50:38 UTC

arrow git commit: ARROW-438: [C++/Python] Implement zero-data-copy record batch and table concatenation.

Repository: arrow
Updated Branches:
  refs/heads/master 1094d89d4 -> 6526a522d


ARROW-438: [C++/Python] Implement zero-data-copy record batch and table concatenation.

This also fixes a bug in ChunkedArray::Equals. This is caught by the Python test suite but would benefit from more C++ unit tests.

Author: Wes McKinney <we...@twosigma.com>

Closes #274 from wesm/ARROW-438 and squashes the following commits:

1f39568 [Wes McKinney] py3 compatibility
2e76c5e [Wes McKinney] Implement arrow::ConcatenateTables and Python wrapper. Fix bug in ChunkedArray::Equals
f3cb170 [Wes McKinney] Fix Cython compilation, verify pyarrow.Table.from_batches still works
af28755 [Wes McKinney] Implement Table::FromRecordBatches


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6526a522
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6526a522
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6526a522

Branch: refs/heads/master
Commit: 6526a522d05e703f2f75fcdad067c8aed7bb6047
Parents: 1094d89
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun Jan 8 10:50:30 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jan 8 10:50:30 2017 -0500

----------------------------------------------------------------------
 cpp/CMakeLists.txt                          |   2 +-
 cpp/src/arrow/column.cc                     |  11 +-
 cpp/src/arrow/column.h                      |   2 +
 cpp/src/arrow/io/io-file-test.cc            |   1 -
 cpp/src/arrow/table-test.cc                 |  88 ++++++++++++--
 cpp/src/arrow/table.cc                      |  71 +++++++++++
 cpp/src/arrow/table.h                       |  13 +-
 cpp/src/arrow/test-util.h                   |  43 ++++---
 python/CMakeLists.txt                       |   3 +
 python/benchmarks/array.py                  |   7 +-
 python/doc/pandas.rst                       |   5 +-
 python/pyarrow/__init__.py                  |   2 +-
 python/pyarrow/array.pyx                    |  25 ++++
 python/pyarrow/includes/libarrow.pxd        |  16 +++
 python/pyarrow/table.pyx                    | 147 ++++++++++++++++-------
 python/pyarrow/tests/test_convert_pandas.py |   6 +-
 python/pyarrow/tests/test_parquet.py        |  12 +-
 python/pyarrow/tests/test_table.py          |  27 +++++
 python/setup.py                             |   5 +-
 19 files changed, 395 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 3522e5c..87b7841 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -76,7 +76,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
 
   option(ARROW_JEMALLOC
     "Build the Arrow jemalloc-based allocator"
-    ON)
+    OFF)
 
   option(ARROW_BOOST_USE_SHARED
     "Rely on boost shared libraries where relevant"

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/column.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc
index 3e89956..9cc0f57 100644
--- a/cpp/src/arrow/column.cc
+++ b/cpp/src/arrow/column.cc
@@ -45,7 +45,9 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
   int32_t this_start_idx = 0;
   int other_chunk_idx = 0;
   int32_t other_start_idx = 0;
-  while (this_chunk_idx < static_cast<int32_t>(chunks_.size())) {
+
+  int64_t elements_compared = 0;
+  while (elements_compared < length_) {
     const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx];
     const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx);
     int32_t common_length = std::min(
@@ -55,14 +57,21 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const {
       return false;
     }
 
+    elements_compared += common_length;
+
     // If we have exhausted the current chunk, proceed to the next one individually.
     if (this_start_idx + common_length == this_array->length()) {
       this_chunk_idx++;
       this_start_idx = 0;
+    } else {
+      this_start_idx += common_length;
     }
+
     if (other_start_idx + common_length == other_array->length()) {
       other_chunk_idx++;
       other_start_idx = 0;
+    } else {
+      other_start_idx += common_length;
     }
   }
   return true;

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index f716473..a28b266 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -48,6 +48,8 @@ class ARROW_EXPORT ChunkedArray {
 
   std::shared_ptr<Array> chunk(int i) const { return chunks_[i]; }
 
+  const ArrayVector& chunks() const { return chunks_; }
+
   bool Equals(const ChunkedArray& other) const;
   bool Equals(const std::shared_ptr<ChunkedArray>& other) const;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 378b60e..821e71d 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -301,7 +301,6 @@ class MyMemoryPool : public MemoryPool {
       return Status::OutOfMemory(ss.str());
     }
 
-
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 734b941..67c9f67 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -44,16 +44,20 @@ class TestTable : public TestBase {
     vector<shared_ptr<Field>> fields = {f0, f1, f2};
     schema_ = std::make_shared<Schema>(fields);
 
-    columns_ = {
-        std::make_shared<Column>(schema_->field(0), MakePrimitive<Int32Array>(length)),
-        std::make_shared<Column>(schema_->field(1), MakePrimitive<UInt8Array>(length)),
-        std::make_shared<Column>(schema_->field(2), MakePrimitive<Int16Array>(length))};
+    arrays_ = {MakePrimitive<Int32Array>(length), MakePrimitive<UInt8Array>(length),
+        MakePrimitive<Int16Array>(length)};
+
+    columns_ = {std::make_shared<Column>(schema_->field(0), arrays_[0]),
+        std::make_shared<Column>(schema_->field(1), arrays_[1]),
+        std::make_shared<Column>(schema_->field(2), arrays_[2])};
   }
 
  protected:
   std::shared_ptr<Table> table_;
   shared_ptr<Schema> schema_;
-  vector<std::shared_ptr<Column>> columns_;
+
+  std::vector<std::shared_ptr<Array>> arrays_;
+  std::vector<std::shared_ptr<Column>> columns_;
 };
 
 TEST_F(TestTable, EmptySchema) {
@@ -65,7 +69,7 @@ TEST_F(TestTable, EmptySchema) {
 }
 
 TEST_F(TestTable, Ctors) {
-  int length = 100;
+  const int length = 100;
   MakeExample1(length);
 
   std::string name = "data";
@@ -83,7 +87,7 @@ TEST_F(TestTable, Ctors) {
 }
 
 TEST_F(TestTable, Metadata) {
-  int length = 100;
+  const int length = 100;
   MakeExample1(length);
 
   std::string name = "data";
@@ -98,7 +102,7 @@ TEST_F(TestTable, Metadata) {
 
 TEST_F(TestTable, InvalidColumns) {
   // Check that columns are all the same length
-  int length = 100;
+  const int length = 100;
   MakeExample1(length);
 
   table_.reset(new Table("data", schema_, columns_, length - 1));
@@ -120,7 +124,7 @@ TEST_F(TestTable, InvalidColumns) {
 }
 
 TEST_F(TestTable, Equals) {
-  int length = 100;
+  const int length = 100;
   MakeExample1(length);
 
   std::string name = "data";
@@ -145,6 +149,72 @@ TEST_F(TestTable, Equals) {
   ASSERT_FALSE(table_->Equals(std::make_shared<Table>(name, schema_, other_columns)));
 }
 
+TEST_F(TestTable, FromRecordBatches) {
+  const int32_t length = 10;
+  MakeExample1(length);
+
+  auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+  std::shared_ptr<Table> result, expected;
+  ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &result));
+
+  expected = std::make_shared<Table>("foo", schema_, columns_);
+  ASSERT_TRUE(result->Equals(expected));
+
+  std::vector<std::shared_ptr<Column>> other_columns;
+  for (int i = 0; i < schema_->num_fields(); ++i) {
+    std::vector<std::shared_ptr<Array>> col_arrays = {arrays_[i], arrays_[i]};
+    other_columns.push_back(std::make_shared<Column>(schema_->field(i), col_arrays));
+  }
+
+  ASSERT_OK(Table::FromRecordBatches("foo", {batch1, batch1}, &result));
+  expected = std::make_shared<Table>("foo", schema_, other_columns);
+  ASSERT_TRUE(result->Equals(expected));
+
+  // Error states
+  std::vector<std::shared_ptr<RecordBatch>> empty_batches;
+  ASSERT_RAISES(Invalid, Table::FromRecordBatches("", empty_batches, &result));
+
+  std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)};
+  auto other_schema = std::make_shared<Schema>(fields);
+
+  std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
+  auto batch2 = std::make_shared<RecordBatch>(other_schema, length, other_arrays);
+  ASSERT_RAISES(Invalid, Table::FromRecordBatches("", {batch1, batch2}, &result));
+}
+
+TEST_F(TestTable, ConcatenateTables) {
+  const int32_t length = 10;
+
+  MakeExample1(length);
+  auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+  // generate different data
+  MakeExample1(length);
+  auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_);
+
+  std::shared_ptr<Table> t1, t2, t3, result, expected;
+  ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &t1));
+  ASSERT_OK(Table::FromRecordBatches("foo", {batch2}, &t2));
+
+  ASSERT_OK(ConcatenateTables("bar", {t1, t2}, &result));
+  ASSERT_OK(Table::FromRecordBatches("bar", {batch1, batch2}, &expected));
+  ASSERT_TRUE(result->Equals(expected));
+
+  // Error states
+  std::vector<std::shared_ptr<Table>> empty_tables;
+  ASSERT_RAISES(Invalid, ConcatenateTables("", empty_tables, &result));
+
+  std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)};
+  auto other_schema = std::make_shared<Schema>(fields);
+
+  std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]};
+  auto batch3 = std::make_shared<RecordBatch>(other_schema, length, other_arrays);
+  ASSERT_OK(Table::FromRecordBatches("", {batch3}, &t3));
+
+  ASSERT_RAISES(Invalid, ConcatenateTables("foo", {t1, t3}, &result));
+}
+
 class TestRecordBatch : public TestBase {};
 
 TEST_F(TestRecordBatch, Equals) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 45f672e..b3563ea 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -77,6 +77,77 @@ Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
     const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows)
     : name_(name), schema_(schema), columns_(columns), num_rows_(num_rows) {}
 
+Status Table::FromRecordBatches(const std::string& name,
+    const std::vector<std::shared_ptr<RecordBatch>>& batches,
+    std::shared_ptr<Table>* table) {
+  if (batches.size() == 0) {
+    return Status::Invalid("Must pass at least one record batch");
+  }
+
+  std::shared_ptr<Schema> schema = batches[0]->schema();
+
+  const int nbatches = static_cast<int>(batches.size());
+  const int ncolumns = static_cast<int>(schema->num_fields());
+
+  for (int i = 1; i < nbatches; ++i) {
+    if (!batches[i]->schema()->Equals(schema)) {
+      std::stringstream ss;
+      ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
+         << schema->ToString() << "\nvs\n"
+         << batches[i]->schema()->ToString();
+      return Status::Invalid(ss.str());
+    }
+  }
+
+  std::vector<std::shared_ptr<Column>> columns(ncolumns);
+  std::vector<std::shared_ptr<Array>> column_arrays(nbatches);
+
+  for (int i = 0; i < ncolumns; ++i) {
+    for (int j = 0; j < nbatches; ++j) {
+      column_arrays[j] = batches[j]->column(i);
+    }
+    columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
+  }
+
+  *table = std::make_shared<Table>(name, schema, columns);
+  return Status::OK();
+}
+
+Status ConcatenateTables(const std::string& output_name,
+    const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table) {
+  if (tables.size() == 0) { return Status::Invalid("Must pass at least one table"); }
+
+  std::shared_ptr<Schema> schema = tables[0]->schema();
+
+  const int ntables = static_cast<int>(tables.size());
+  const int ncolumns = static_cast<int>(schema->num_fields());
+
+  for (int i = 1; i < ntables; ++i) {
+    if (!tables[i]->schema()->Equals(schema)) {
+      std::stringstream ss;
+      ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
+         << schema->ToString() << "\nvs\n"
+         << tables[i]->schema()->ToString();
+      return Status::Invalid(ss.str());
+    }
+  }
+
+  std::vector<std::shared_ptr<Column>> columns(ncolumns);
+  for (int i = 0; i < ncolumns; ++i) {
+    std::vector<std::shared_ptr<Array>> column_arrays;
+    for (int j = 0; j < ntables; ++j) {
+      const std::vector<std::shared_ptr<Array>>& chunks =
+          tables[j]->column(i)->data()->chunks();
+      for (const auto& chunk : chunks) {
+        column_arrays.push_back(chunk);
+      }
+    }
+    columns[i] = std::make_shared<Column>(schema->field(i), column_arrays);
+  }
+  *table = std::make_shared<Table>(output_name, schema, columns);
+  return Status::OK();
+}
+
 bool Table::Equals(const Table& other) const {
   if (name_ != other.name()) { return false; }
   if (!schema_->Equals(other.schema())) { return false; }

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 0f2418d..583847c 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -82,7 +82,13 @@ class ARROW_EXPORT Table {
   // same length as num_rows -- you can validate this using
   // Table::ValidateColumns
   Table(const std::string& name, const std::shared_ptr<Schema>& schema,
-      const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows);
+      const std::vector<std::shared_ptr<Column>>& columns, int64_t nubm_rows);
+
+  // Construct table from RecordBatch, but only if all of the batch schemas are
+  // equal. Returns Status::Invalid if there is some problem
+  static Status FromRecordBatches(const std::string& name,
+      const std::vector<std::shared_ptr<RecordBatch>>& batches,
+      std::shared_ptr<Table>* table);
 
   // @returns: the table's name, if any (may be length 0)
   const std::string& name() const { return name_; }
@@ -116,6 +122,11 @@ class ARROW_EXPORT Table {
   int64_t num_rows_;
 };
 
+// Construct table from multiple input tables. Return Status::Invalid if
+// schemas are not equal
+Status ARROW_EXPORT ConcatenateTables(const std::string& output_name,
+    const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table);
+
 }  // namespace arrow
 
 #endif  // ARROW_TABLE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index f2da824..b59809d 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -71,23 +71,6 @@
 
 namespace arrow {
 
-class TestBase : public ::testing::Test {
- public:
-  void SetUp() { pool_ = default_memory_pool(); }
-
-  template <typename ArrayType>
-  std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) {
-    auto data = std::make_shared<PoolBuffer>(pool_);
-    auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
-    EXPECT_OK(data->Resize(length * sizeof(typename ArrayType::value_type)));
-    EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
-    return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
-  }
-
- protected:
-  MemoryPool* pool_;
-};
-
 namespace test {
 
 template <typename T>
@@ -253,6 +236,32 @@ Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool,
 
 }  // namespace test
 
+class TestBase : public ::testing::Test {
+ public:
+  void SetUp() {
+    pool_ = default_memory_pool();
+    random_seed_ = 0;
+  }
+
+  template <typename ArrayType>
+  std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) {
+    auto data = std::make_shared<PoolBuffer>(pool_);
+    const int64_t data_nbytes = length * sizeof(typename ArrayType::value_type);
+    EXPECT_OK(data->Resize(data_nbytes));
+
+    // Fill with random data
+    test::random_bytes(data_nbytes, random_seed_++, data->mutable_data());
+
+    auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
+    EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
+    return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
+  }
+
+ protected:
+  uint32_t random_seed_;
+  MemoryPool* pool_;
+};
+
 template <typename TYPE, typename C_TYPE>
 void ArrayFromVector(const std::shared_ptr<DataType>& type,
     const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 6c24772..e42c45d 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -70,6 +70,9 @@ include(SetupCxxFlags)
 # Add common flags
 set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
 
+# Suppress Cython warnings
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable")
+
 # Determine compiler version
 include(CompilerInfo)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/benchmarks/array.py
----------------------------------------------------------------------
diff --git a/python/benchmarks/array.py b/python/benchmarks/array.py
index 4268f00..e22c0f7 100644
--- a/python/benchmarks/array.py
+++ b/python/benchmarks/array.py
@@ -49,10 +49,10 @@ class PandasConversionsToArrow(PandasConversionsBase):
     params = ((1, 10 ** 5, 10 ** 6, 10 ** 7), ('int64', 'float64', 'float64_nans', 'str'))
 
     def time_from_series(self, n, dtype):
-        A.from_pandas_dataframe(self.data)
+        A.Table.from_pandas(self.data)
 
     def peakmem_from_series(self, n, dtype):
-        A.from_pandas_dataframe(self.data)
+        A.Table.from_pandas(self.data)
 
 
 class PandasConversionsFromArrow(PandasConversionsBase):
@@ -61,7 +61,7 @@ class PandasConversionsFromArrow(PandasConversionsBase):
 
     def setup(self, n, dtype):
         super(PandasConversionsFromArrow, self).setup(n, dtype)
-        self.arrow_data = A.from_pandas_dataframe(self.data)
+        self.arrow_data = A.Table.from_pandas(self.data)
 
     def time_to_series(self, n, dtype):
         self.arrow_data.to_pandas()
@@ -80,4 +80,3 @@ class ScalarAccess(object):
     def time_as_py(self, n):
         for i in range(n):
             self._array[i].as_py()
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/doc/pandas.rst
----------------------------------------------------------------------
diff --git a/python/doc/pandas.rst b/python/doc/pandas.rst
index 7c70074..c225d13 100644
--- a/python/doc/pandas.rst
+++ b/python/doc/pandas.rst
@@ -31,7 +31,7 @@ represent more data than a DataFrame, so a full conversion is not always possibl
 
 Conversion from a Table to a DataFrame is done by calling
 :meth:`pyarrow.table.Table.to_pandas`. The inverse is then achieved by using
-:meth:`pyarrow.from_pandas_dataframe`. This conversion routine provides the
+:meth:`pyarrow.Table.from_pandas`. This conversion routine provides the
 convience parameter ``timestamps_to_ms``. Although Arrow supports timestamps of
 different resolutions, Pandas only supports nanosecond timestamps and most
 other systems (e.g. Parquet) only work on millisecond timestamps. This parameter
@@ -45,7 +45,7 @@ conversion.
 
     df = pd.DataFrame({"a": [1, 2, 3]})
     # Convert from Pandas to Arrow
-    table = pa.from_pandas_dataframe(df)
+    table = pa.Table.from_pandas(df)
     # Convert back to Pandas
     df_new = table.to_pandas()
 
@@ -111,4 +111,3 @@ Arrow -> Pandas Conversion
 +-------------------------------------+--------------------------------------------------------+
 | ``TIMESTAMP(unit=*)``               | ``pd.Timestamp`` (``np.datetime64[ns]``)               |
 +-------------------------------------+--------------------------------------------------------+
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 02b2b06..d25cdd4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -56,4 +56,4 @@ from pyarrow.schema import (null, bool_,
                             list_, struct, field,
                             DataType, Field, Schema, schema)
 
-from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
+from pyarrow.table import Column, RecordBatch, Table, concat_tables

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index c178d5c..266768f 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -91,6 +91,29 @@ cdef class Array:
         """
         return from_pandas_series(obj, mask)
 
+    @staticmethod
+    def from_list(object list_obj, DataType type=None):
+        """
+        Convert Python list to Arrow array
+
+        Parameters
+        ----------
+        list_obj : array_like
+
+        Returns
+        -------
+        pyarrow.array.Array
+        """
+        cdef:
+            shared_ptr[CArray] sp_array
+
+        if type is None:
+            check_status(pyarrow.ConvertPySequence(list_obj, &sp_array))
+        else:
+            raise NotImplementedError()
+
+        return box_arrow_array(sp_array)
+
     property null_count:
 
         def __get__(self):
@@ -348,3 +371,5 @@ cdef object series_as_ndarray(object obj):
         result = obj
 
     return result
+
+from_pylist = Array.from_list

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 3cdfe49..b0f971d 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -182,6 +182,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CColumn(const shared_ptr[CField]& field,
                 const vector[shared_ptr[CArray]]& chunks)
 
+        c_bool Equals(const CColumn& other)
+        c_bool Equals(const shared_ptr[CColumn]& other)
+
         int64_t length()
         int64_t null_count()
         const c_string& name()
@@ -207,14 +210,27 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CTable(const c_string& name, const shared_ptr[CSchema]& schema,
                const vector[shared_ptr[CColumn]]& columns)
 
+        @staticmethod
+        CStatus FromRecordBatches(
+            const c_string& name,
+            const vector[shared_ptr[CRecordBatch]]& batches,
+            shared_ptr[CTable]* table)
+
         int num_columns()
         int num_rows()
 
+        c_bool Equals(const CTable& other)
+        c_bool Equals(const shared_ptr[CTable]& other)
+
         const c_string& name()
 
         shared_ptr[CSchema] schema()
         shared_ptr[CColumn] column(int i)
 
+    CStatus ConcatenateTables(const c_string& output_name,
+                              const vector[shared_ptr[CTable]]& tables,
+                              shared_ptr[CTable]* result)
+
 
 cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil:
     cdef cppclass SchemaMessage:

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 9255431..3a04651 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -155,6 +155,31 @@ cdef class Column:
 
         return pd.Series(PyObject_to_object(arr), name=self.name)
 
+    def equals(self, Column other):
+        """
+        Check if contents of two columns are equal
+
+        Parameters
+        ----------
+        other : pyarrow.Column
+
+        Returns
+        -------
+        are_equal : boolean
+        """
+        cdef:
+            CColumn* my_col = self.column
+            CColumn* other_col = other.column
+            c_bool result
+
+        self._check_nullptr()
+        other._check_nullptr()
+
+        with nogil:
+            result = my_col.Equals(deref(other_col))
+
+        return result
+
     def to_pylist(self):
         """
         Convert to a list of native Python objects.
@@ -343,10 +368,18 @@ cdef class RecordBatch:
         return arr
 
     def equals(self, RecordBatch other):
+        cdef:
+            CRecordBatch* my_batch = self.batch
+            CRecordBatch* other_batch = other.batch
+            c_bool result
+
         self._check_nullptr()
         other._check_nullptr()
 
-        return self.batch.Equals(deref(other.batch))
+        with nogil:
+            result = my_batch.Equals(deref(other_batch))
+
+        return result
 
     def to_pydict(self):
         """
@@ -424,7 +457,6 @@ cdef class RecordBatch:
         """
         cdef:
             Array arr
-            RecordBatch result
             c_string c_name
             shared_ptr[CSchema] schema
             shared_ptr[CRecordBatch] batch
@@ -442,11 +474,7 @@ cdef class RecordBatch:
             c_arrays.push_back(arr.sp_array)
 
         batch.reset(new CRecordBatch(schema, num_rows, c_arrays))
-
-        result = RecordBatch()
-        result.init(batch)
-
-        return result
+        return batch_from_cbatch(batch)
 
 
 cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
@@ -498,6 +526,31 @@ cdef class Table:
             raise ReferenceError("Table object references a NULL pointer."
                     "Not initialized.")
 
+    def equals(self, Table other):
+        """
+        Check if contents of two tables are equal
+
+        Parameters
+        ----------
+        other : pyarrow.Table
+
+        Returns
+        -------
+        are_equal : boolean
+        """
+        cdef:
+            CTable* my_table = self.table
+            CTable* other_table = other.table
+            c_bool result
+
+        self._check_nullptr()
+        other._check_nullptr()
+
+        with nogil:
+            result = my_table.Equals(deref(other_table))
+
+        return result
+
     @classmethod
     def from_pandas(cls, df, name=None, timestamps_to_ms=False):
         """
@@ -527,7 +580,7 @@ cdef class Table:
             ...     'int': [1, 2],
             ...     'str': ['a', 'b']
             ... })
-        >>> pa.table.from_pandas_dataframe(df)
+        >>> pa.Table.from_pandas(df)
         <pyarrow.table.Table object at 0x7f05d1fb1b40>
         """
         names, arrays = _dataframe_to_arrays(df, name=name,
@@ -559,7 +612,6 @@ cdef class Table:
             c_string c_name
             vector[shared_ptr[CField]] fields
             vector[shared_ptr[CColumn]] columns
-            Table result
             shared_ptr[CSchema] schema
             shared_ptr[CTable] table
 
@@ -577,14 +629,10 @@ cdef class Table:
             c_name = tobytes(name)
 
         table.reset(new CTable(c_name, schema, columns))
-
-        result = Table()
-        result.init(table)
-
-        return result
+        return table_from_ctable(table)
 
     @staticmethod
-    def from_batches(batches):
+    def from_batches(batches, name=None):
         """
         Construct a Table from a list of Arrow RecordBatches
 
@@ -594,39 +642,21 @@ cdef class Table:
         batches: list of RecordBatch
             RecordBatch list to be converted, schemas must be equal
         """
-
         cdef:
-            vector[shared_ptr[CArray]] c_array_chunks
-            vector[shared_ptr[CColumn]] c_columns
+            vector[shared_ptr[CRecordBatch]] c_batches
             shared_ptr[CTable] c_table
-            Array arr
-            Schema schema
-
-        import pandas as pd
+            RecordBatch batch
+            Table table
+            c_string c_name
 
-        schema = batches[0].schema
+        c_name = b'' if name is None else tobytes(name)
 
-        # check schemas are equal
-        for other in batches[1:]:
-            if not schema.equals(other.schema):
-                raise ArrowException("Error converting list of RecordBatches "
-                        "to DataFrame, not all schemas are equal: {%s} != {%s}"
-                        % (str(schema), str(other.schema)))
+        for batch in batches:
+            c_batches.push_back(batch.sp_batch)
 
-        cdef int K = batches[0].num_columns
+        with nogil:
+            check_status(CTable.FromRecordBatches(c_name, c_batches, &c_table))
 
-        # create chunked columns from the batches
-        c_columns.resize(K)
-        for i in range(K):
-            for batch in batches:
-                arr = batch[i]
-                c_array_chunks.push_back(arr.sp_array)
-            c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i),
-                               c_array_chunks))
-            c_array_chunks.clear()
-
-        # create a Table from columns and convert to DataFrame
-        c_table.reset(new CTable('', schema.sp_schema, c_columns))
         table = Table()
         table.init(c_table)
         return table
@@ -760,9 +790,40 @@ cdef class Table:
         return (self.num_rows, self.num_columns)
 
 
+def concat_tables(tables, output_name=None):
+    """
+    Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
+    if all of the Table schemas are not the same
+
+    Parameters
+    ----------
+    tables : iterable of pyarrow.Table objects
+    output_name : string, default None
+      A name for the output table, if any
+    """
+    cdef:
+        vector[shared_ptr[CTable]] c_tables
+        shared_ptr[CTable] c_result
+        Table table
+        c_string c_name
+
+    c_name = b'' if output_name is None else tobytes(output_name)
+
+    for table in tables:
+        c_tables.push_back(table.sp_table)
+
+    with nogil:
+        check_status(ConcatenateTables(c_name, c_tables, &c_result))
+
+    return table_from_ctable(c_result)
+
+
 cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
     cdef Table table = Table()
     table.init(ctable)
     return table
 
-from_pandas_dataframe = Table.from_pandas
+cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
+    cdef RecordBatch batch = RecordBatch()
+    batch.init(cbatch)
+    return batch

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index bb9f0b3..12e7a08 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -61,7 +61,7 @@ class TestPandasConversion(unittest.TestCase):
 
     def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
                                 timestamps_to_ms=False):
-        table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms)
+        table = A.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms)
         result = table.to_pandas(nthreads=nthreads)
         if expected is None:
             expected = df
@@ -193,7 +193,7 @@ class TestPandasConversion(unittest.TestCase):
         values = [u('qux'), b'foo', None, 'bar', 'qux', np.nan]
         df = pd.DataFrame({'strings': values})
 
-        table = A.from_pandas_dataframe(df)
+        table = A.Table.from_pandas(df)
         assert table[0].type == A.binary()
 
         values2 = [b'qux', b'foo', None, b'bar', b'qux', np.nan]
@@ -245,7 +245,7 @@ class TestPandasConversion(unittest.TestCase):
                      None,
                      datetime.date(1970, 1, 1),
                      datetime.date(2040, 2, 26)]})
-        table = A.from_pandas_dataframe(df)
+        table = A.Table.from_pandas(df)
         result = table.to_pandas()
         expected = df.copy()
         expected['date'] = pd.to_datetime(df['date'])

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 7c45732..0fb913c 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -79,7 +79,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir):
         'empty_str': [''] * size
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.from_pandas_dataframe(df, timestamps_to_ms=True)
+    arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True)
     A.parquet.write_table(arrow_table, filename.strpath, version="2.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
@@ -107,7 +107,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir):
         'empty_str': [''] * size
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.from_pandas_dataframe(df)
+    arrow_table = A.Table.from_pandas(df)
     A.parquet.write_table(arrow_table, filename.strpath, version="1.0")
     table_read = pq.read_table(filename.strpath)
     df_read = table_read.to_pandas()
@@ -126,7 +126,7 @@ def test_pandas_column_selection(tmpdir):
         'uint16': np.arange(size, dtype=np.uint16)
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.from_pandas_dataframe(df)
+    arrow_table = A.Table.from_pandas(df)
     A.parquet.write_table(arrow_table, filename.strpath)
     table_read = pq.read_table(filename.strpath, columns=['uint8'])
     df_read = table_read.to_pandas()
@@ -155,7 +155,7 @@ def _test_dataframe(size=10000):
 @parquet
 def test_pandas_parquet_native_file_roundtrip(tmpdir):
     df = _test_dataframe(10000)
-    arrow_table = A.from_pandas_dataframe(df)
+    arrow_table = A.Table.from_pandas(df)
     imos = paio.InMemoryOutputStream()
     pq.write_table(arrow_table, imos, version="2.0")
     buf = imos.get_result()
@@ -176,7 +176,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir):
         'strings': ['foo', 'bar', None, 'baz', 'qux']
     })
 
-    arrow_table = A.from_pandas_dataframe(df)
+    arrow_table = A.Table.from_pandas(df)
 
     with open(filename, 'wb') as f:
         A.parquet.write_table(arrow_table, f, version="1.0")
@@ -206,7 +206,7 @@ def test_pandas_parquet_configuration_options(tmpdir):
         'bool': np.random.randn(size) > 0
     })
     filename = tmpdir.join('pandas_rountrip.parquet')
-    arrow_table = A.from_pandas_dataframe(df)
+    arrow_table = A.Table.from_pandas(df)
 
     for use_dictionary in [True, False]:
         A.parquet.write_table(

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/pyarrow/tests/test_table.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py
index 9985b3e..6f00c73 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -111,6 +111,33 @@ def test_table_basics():
             assert chunk is not None
 
 
+def test_concat_tables():
+    data = [
+        list(range(5)),
+        [-10., -5., 0., 5., 10.]
+    ]
+    data2 = [
+        list(range(5, 10)),
+        [1., 2., 3., 4., 5.]
+    ]
+
+    t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
+                                           for x in data], 'table_name')
+    t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x)
+                                           for x in data2], 'table_name')
+
+    result = pa.concat_tables([t1, t2], output_name='foo')
+    assert result.name == 'foo'
+    assert len(result) == 10
+
+    expected = pa.Table.from_arrays(
+        ('a', 'b'), [pa.from_pylist(x + y)
+                     for x, y in zip(data, data2)],
+        'foo')
+
+    assert result.equals(expected)
+
+
 def test_table_pandas():
     data = [
         pa.from_pylist(range(5)),

http://git-wip-us.apache.org/repos/asf/arrow/blob/6526a522/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 2e595e2..3829a79 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -143,7 +143,10 @@ class build_ext(_build_ext):
                              cmake_options + [source])
 
             self.spawn(cmake_command)
-            args = ['make', 'VERBOSE=1']
+            args = ['make']
+            if os.environ.get('PYARROW_BUILD_VERBOSE', '0') == '1':
+                args.append('VERBOSE=1')
+
             if 'PYARROW_PARALLEL' in os.environ:
                 args.append('-j{0}'.format(os.environ['PYARROW_PARALLEL']))
             self.spawn(args)