You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/01/19 14:27:40 UTC

[1/3] arrow git commit: ARROW-461: [Python] Add Python interfaces to DictionaryArray data, pandas interop

Repository: arrow
Updated Branches:
  refs/heads/master 353772f84 -> 9b1b3979b


http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/adapters/pandas.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.h b/python/src/pyarrow/adapters/pandas.h
index 664365e..b548f93 100644
--- a/python/src/pyarrow/adapters/pandas.h
+++ b/python/src/pyarrow/adapters/pandas.h
@@ -63,11 +63,7 @@ arrow::Status ConvertTableToPandas(
     const std::shared_ptr<arrow::Table>& table, int nthreads, PyObject** out);
 
 PYARROW_EXPORT
-arrow::Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo,
-    const std::shared_ptr<arrow::Field>& field, std::shared_ptr<arrow::Array>* out);
-
-PYARROW_EXPORT
-arrow::Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao,
+arrow::Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo,
     const std::shared_ptr<arrow::Field>& field, std::shared_ptr<arrow::Array>* out);
 
 }  // namespace pyarrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/common.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.cc b/python/src/pyarrow/common.cc
index 0bdd289..b8712d7 100644
--- a/python/src/pyarrow/common.cc
+++ b/python/src/pyarrow/common.cc
@@ -93,7 +93,7 @@ PyBytesBuffer::PyBytesBuffer(PyObject* obj)
 }
 
 PyBytesBuffer::~PyBytesBuffer() {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   Py_DECREF(obj_);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/common.h
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/common.h b/python/src/pyarrow/common.h
index 639918d..0733a3b 100644
--- a/python/src/pyarrow/common.h
+++ b/python/src/pyarrow/common.h
@@ -30,6 +30,17 @@ class MemoryPool;
 
 namespace pyarrow {
 
+class PyAcquireGIL {
+ public:
+  PyAcquireGIL() { state_ = PyGILState_Ensure(); }
+
+  ~PyAcquireGIL() { PyGILState_Release(state_); }
+
+ private:
+  PyGILState_STATE state_;
+  DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL);
+};
+
 #define PYARROW_IS_PY2 PY_MAJOR_VERSION <= 2
 
 class OwnedRef {
@@ -38,7 +49,10 @@ class OwnedRef {
 
   OwnedRef(PyObject* obj) : obj_(obj) {}
 
-  ~OwnedRef() { Py_XDECREF(obj_); }
+  ~OwnedRef() {
+    PyAcquireGIL lock;
+    Py_XDECREF(obj_);
+  }
 
   void reset(PyObject* obj) {
     if (obj_ != nullptr) { Py_XDECREF(obj_); }
@@ -69,17 +83,6 @@ struct PyObjectStringify {
   }
 };
 
-class PyGILGuard {
- public:
-  PyGILGuard() { state_ = PyGILState_Ensure(); }
-
-  ~PyGILGuard() { PyGILState_Release(state_); }
-
- private:
-  PyGILState_STATE state_;
-  DISALLOW_COPY_AND_ASSIGN(PyGILGuard);
-};
-
 // TODO(wesm): We can just let errors pass through. To be explored later
 #define RETURN_IF_PYERROR()                         \
   if (PyErr_Occurred()) {                           \
@@ -88,8 +91,9 @@ class PyGILGuard {
     PyObjectStringify stringified(exc_value);       \
     std::string message(stringified.bytes);         \
     Py_DECREF(exc_type);                            \
-    Py_DECREF(exc_value);                           \
-    Py_DECREF(traceback);                           \
+    Py_XDECREF(exc_value);                          \
+    Py_XDECREF(traceback);                          \
+    PyErr_Clear();                                  \
     return Status::UnknownError(message);           \
   }
 
@@ -122,17 +126,6 @@ class PYARROW_EXPORT PyBytesBuffer : public arrow::Buffer {
   PyObject* obj_;
 };
 
-class PyAcquireGIL {
- public:
-  PyAcquireGIL() { state_ = PyGILState_Ensure(); }
-
-  ~PyAcquireGIL() { PyGILState_Release(state_); }
-
- private:
-  PyGILState_STATE state_;
-  DISALLOW_COPY_AND_ASSIGN(PyAcquireGIL);
-};
-
 }  // namespace pyarrow
 
 #endif  // PYARROW_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/io.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/io.cc b/python/src/pyarrow/io.cc
index 01f851d..9235260 100644
--- a/python/src/pyarrow/io.cc
+++ b/python/src/pyarrow/io.cc
@@ -114,22 +114,22 @@ PyReadableFile::PyReadableFile(PyObject* file) {
 PyReadableFile::~PyReadableFile() {}
 
 Status PyReadableFile::Close() {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Close();
 }
 
 Status PyReadableFile::Seek(int64_t position) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Seek(position, 0);
 }
 
 Status PyReadableFile::Tell(int64_t* position) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Tell(position);
 }
 
 Status PyReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   PyObject* bytes_obj;
   ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj));
 
@@ -141,7 +141,7 @@ Status PyReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
 }
 
 Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
 
   PyObject* bytes_obj;
   ARROW_RETURN_NOT_OK(file_->Read(nbytes, &bytes_obj));
@@ -153,7 +153,7 @@ Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out)
 }
 
 Status PyReadableFile::GetSize(int64_t* size) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
 
   int64_t current_position;
   ;
@@ -185,17 +185,17 @@ PyOutputStream::PyOutputStream(PyObject* file) {
 PyOutputStream::~PyOutputStream() {}
 
 Status PyOutputStream::Close() {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Close();
 }
 
 Status PyOutputStream::Tell(int64_t* position) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Tell(position);
 }
 
 Status PyOutputStream::Write(const uint8_t* data, int64_t nbytes) {
-  PyGILGuard lock;
+  PyAcquireGIL lock;
   return file_->Write(data, nbytes);
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/util/CMakeLists.txt b/python/src/pyarrow/util/CMakeLists.txt
index 4afb4d0..6cd49cb 100644
--- a/python/src/pyarrow/util/CMakeLists.txt
+++ b/python/src/pyarrow/util/CMakeLists.txt
@@ -20,7 +20,7 @@
 #######################################
 
 if (PYARROW_BUILD_TESTS)
-  add_library(pyarrow_test_main
+  add_library(pyarrow_test_main STATIC
 	test_main.cc)
 
   if (APPLE)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/util/test_main.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/util/test_main.cc b/python/src/pyarrow/util/test_main.cc
index 6fb7c05..02e9a54 100644
--- a/python/src/pyarrow/util/test_main.cc
+++ b/python/src/pyarrow/util/test_main.cc
@@ -15,12 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <Python.h>
+
 #include <gtest/gtest.h>
 
+#include "pyarrow/do_import_numpy.h"
+#include "pyarrow/numpy_interop.h"
+
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
 
+  Py_Initialize();
+  pyarrow::import_numpy();
+
   int ret = RUN_ALL_TESTS();
 
+  Py_Finalize();
+
   return ret;
 }


[3/3] arrow git commit: ARROW-461: [Python] Add Python interfaces to DictionaryArray data, pandas interop

Posted by we...@apache.org.
ARROW-461: [Python] Add Python interfaces to DictionaryArray data, pandas interop

Author: Wes McKinney <we...@twosigma.com>

Closes #291 from wesm/ARROW-461 and squashes the following commits:

b3efe96 [Wes McKinney] Fix cpp unit test, code review comments
285f863 [Wes McKinney] Accept list input in Array.from_pandas
16aa9d6 [Wes McKinney] Add Categorical conversion for single array or column. Required moving code around a little bit in pandas.cc
3d409e8 [Wes McKinney] First round of DataFrame-level dictionary to Categorical conversion
0b20c38 [Wes McKinney] Draft Python wrapper classes for DictionaryType, DictionaryArray. Avoid segfault when conversion to UTF8 fails. Starting on CategoricalBlock implementation


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/9b1b3979
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/9b1b3979
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/9b1b3979

Branch: refs/heads/master
Commit: 9b1b3979b499dc06b71a31b2696534550503d6e2
Parents: 353772f
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Jan 19 09:27:32 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Jan 19 09:27:32 2017 -0500

----------------------------------------------------------------------
 cpp/src/arrow/array-dictionary-test.cc       |    2 +-
 cpp/src/arrow/array.h                        |    2 +
 cpp/src/arrow/builder.cc                     |    9 +-
 cpp/src/arrow/builder.h                      |    7 +-
 cpp/src/arrow/type.cc                        |    4 +-
 python/CMakeLists.txt                        |   10 +
 python/cmake_modules/FindPythonLibsNew.cmake |    3 +-
 python/pyarrow/__init__.py                   |   11 +-
 python/pyarrow/array.pxd                     |   44 +-
 python/pyarrow/array.pyx                     |  230 +-
 python/pyarrow/includes/libarrow.pxd         |   38 +-
 python/pyarrow/includes/pyarrow.pxd          |    6 +-
 python/pyarrow/schema.pxd                    |    8 +-
 python/pyarrow/schema.pyx                    |   28 +
 python/pyarrow/table.pyx                     |   74 +-
 python/pyarrow/tests/test_column.py          |    1 -
 python/pyarrow/tests/test_convert_pandas.py  |   61 +-
 python/setup.py                              |    1 +
 python/src/pyarrow/CMakeLists.txt            |    2 +
 python/src/pyarrow/adapters/pandas-test.cc   |   64 +
 python/src/pyarrow/adapters/pandas.cc        | 2694 +++++++++++----------
 python/src/pyarrow/adapters/pandas.h         |    6 +-
 python/src/pyarrow/common.cc                 |    2 +-
 python/src/pyarrow/common.h                  |   43 +-
 python/src/pyarrow/io.cc                     |   18 +-
 python/src/pyarrow/util/CMakeLists.txt       |    2 +-
 python/src/pyarrow/util/test_main.cc         |   10 +
 27 files changed, 1881 insertions(+), 1499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/cpp/src/arrow/array-dictionary-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-dictionary-test.cc b/cpp/src/arrow/array-dictionary-test.cc
index c290153..1a0d49a 100644
--- a/cpp/src/arrow/array-dictionary-test.cc
+++ b/cpp/src/arrow/array-dictionary-test.cc
@@ -46,7 +46,7 @@ TEST(TestDictionary, Basics) {
   ASSERT_TRUE(int16()->Equals(type2.index_type()));
   ASSERT_TRUE(type2.dictionary()->Equals(dict));
 
-  ASSERT_EQ("dictionary<int32, int16>", type1->ToString());
+  ASSERT_EQ("dictionary<values=int32, indices=int16>", type1->ToString());
 }
 
 TEST(TestDictionary, Equals) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 45f8ab9..4f4b727 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -553,6 +553,8 @@ class ARROW_EXPORT DictionaryArray : public Array {
   std::shared_ptr<Array> indices() const { return indices_; }
   std::shared_ptr<Array> dictionary() const;
 
+  const DictionaryType* dict_type() { return dict_type_; }
+
   bool EqualsExact(const DictionaryArray& other) const;
   bool Equals(const std::shared_ptr<Array>& arr) const override;
   bool RangeEquals(int32_t start_idx, int32_t end_idx, int32_t other_start_idx,

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index a308ea5..b0dc41b 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -421,9 +421,12 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
     BUILDER_CASE(FLOAT, FloatBuilder);
     BUILDER_CASE(DOUBLE, DoubleBuilder);
 
-    BUILDER_CASE(STRING, StringBuilder);
-    BUILDER_CASE(BINARY, BinaryBuilder);
-
+    case Type::STRING:
+      out->reset(new StringBuilder(pool));
+      return Status::OK();
+    case Type::BINARY:
+      out->reset(new BinaryBuilder(pool, type));
+      return Status::OK();
     case Type::LIST: {
       std::shared_ptr<ArrayBuilder> value_builder;
       std::shared_ptr<DataType> value_type =

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 1837340..735bca1 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/util/bit-util.h"
@@ -33,7 +34,6 @@
 namespace arrow {
 
 class Array;
-class MemoryPool;
 
 static constexpr int32_t kMinBuilderCapacity = 1 << 5;
 
@@ -378,7 +378,10 @@ class ARROW_EXPORT BinaryBuilder : public ListBuilder {
 // String builder
 class ARROW_EXPORT StringBuilder : public BinaryBuilder {
  public:
-  explicit StringBuilder(MemoryPool* pool, const TypePtr& type)
+  explicit StringBuilder(MemoryPool* pool = default_memory_pool())
+      : BinaryBuilder(pool, utf8()) {}
+
+  explicit StringBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type)
       : BinaryBuilder(pool, type) {}
 
   using BinaryBuilder::Append;

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 954fba7..ba77584 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -148,8 +148,8 @@ bool DictionaryType::Equals(const DataType& other) const {
 
 std::string DictionaryType::ToString() const {
   std::stringstream ss;
-  ss << "dictionary<" << dictionary_->type()->ToString() << ", "
-     << index_type_->ToString() << ">";
+  ss << "dictionary<values=" << dictionary_->type()->ToString()
+     << ", indices=" << index_type_->ToString() << ">";
   return ss.str();
 }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 45115d4..0a2d4e9 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -55,6 +55,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
     OFF)
 endif()
 
+if(NOT PYARROW_BUILD_TESTS)
+  set(NO_TESTS 1)
+endif()
+
 find_program(CCACHE_FOUND ccache)
 if(CCACHE_FOUND)
   set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache)
@@ -339,6 +343,12 @@ set(PYARROW_MIN_TEST_LIBS
   pyarrow
   ${PYARROW_BASE_LIBS})
 
+if(NOT APPLE)
+  ADD_THIRDPARTY_LIB(python
+    SHARED_LIB "${PYTHON_LIBRARIES}")
+  list(APPEND PYARROW_MIN_TEST_LIBS python)
+endif()
+
 set(PYARROW_TEST_LINK_LIBS ${PYARROW_MIN_TEST_LIBS})
 
 ############################################################

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/cmake_modules/FindPythonLibsNew.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindPythonLibsNew.cmake b/python/cmake_modules/FindPythonLibsNew.cmake
index 5cb65c9..1000a95 100644
--- a/python/cmake_modules/FindPythonLibsNew.cmake
+++ b/python/cmake_modules/FindPythonLibsNew.cmake
@@ -161,6 +161,7 @@ else()
         set(_PYTHON_LIBS_SEARCH "${PYTHON_PREFIX}/lib" "${PYTHON_LIBRARY_PATH}")
     endif()
     message(STATUS "Searching for Python libs in ${_PYTHON_LIBS_SEARCH}")
+    message(STATUS "Looking for python${PYTHON_LIBRARY_SUFFIX}")
     # Probably this needs to be more involved. It would be nice if the config
     # information the python interpreter itself gave us were more complete.
     find_library(PYTHON_LIBRARY
@@ -237,4 +238,4 @@ FUNCTION(PYTHON_ADD_MODULE _NAME )
     ENDIF()
 
   ENDIF(PYTHON_ENABLE_MODULE_${_NAME})
-ENDFUNCTION(PYTHON_ADD_MODULE)
\ No newline at end of file
+ENDFUNCTION(PYTHON_ADD_MODULE)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index a8c3e8e..efffbf2 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -31,9 +31,14 @@ from pyarrow.config import cpu_count, set_cpu_count
 from pyarrow.array import (Array,
                            from_pandas_series, from_pylist,
                            total_allocated_bytes,
-                           BooleanArray, NumericArray,
+                           NumericArray, IntegerArray, FloatingPointArray,
+                           BooleanArray,
                            Int8Array, UInt8Array,
-                           ListArray, StringArray)
+                           Int16Array, UInt16Array,
+                           Int32Array, UInt32Array,
+                           Int64Array, UInt64Array,
+                           ListArray, StringArray,
+                           DictionaryArray)
 
 from pyarrow.error import ArrowException
 
@@ -52,7 +57,7 @@ from pyarrow.schema import (null, bool_,
                             uint8, uint16, uint32, uint64,
                             timestamp, date,
                             float_, double, binary, string,
-                            list_, struct, field,
+                            list_, struct, dictionary, field,
                             DataType, Field, Schema, schema)
 
 from pyarrow.table import Column, RecordBatch, Table, concat_tables

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/array.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pxd b/python/pyarrow/array.pxd
index 8cd15cd..af10535 100644
--- a/python/pyarrow/array.pxd
+++ b/python/pyarrow/array.pxd
@@ -22,6 +22,8 @@ from pyarrow.scalar import NA
 
 from pyarrow.schema cimport DataType
 
+from cpython cimport PyObject
+
 cdef extern from "Python.h":
     int PySlice_Check(object)
 
@@ -47,35 +49,50 @@ cdef class NumericArray(Array):
     pass
 
 
-cdef class Int8Array(NumericArray):
+cdef class IntegerArray(NumericArray):
+    pass
+
+cdef class FloatingPointArray(NumericArray):
+    pass
+
+
+cdef class Int8Array(IntegerArray):
+    pass
+
+
+cdef class UInt8Array(IntegerArray):
+    pass
+
+
+cdef class Int16Array(IntegerArray):
     pass
 
 
-cdef class UInt8Array(NumericArray):
+cdef class UInt16Array(IntegerArray):
     pass
 
 
-cdef class Int16Array(NumericArray):
+cdef class Int32Array(IntegerArray):
     pass
 
 
-cdef class UInt16Array(NumericArray):
+cdef class UInt32Array(IntegerArray):
     pass
 
 
-cdef class Int32Array(NumericArray):
+cdef class Int64Array(IntegerArray):
     pass
 
 
-cdef class UInt32Array(NumericArray):
+cdef class UInt64Array(IntegerArray):
     pass
 
 
-cdef class Int64Array(NumericArray):
+cdef class FloatArray(FloatingPointArray):
     pass
 
 
-cdef class UInt64Array(NumericArray):
+cdef class DoubleArray(FloatingPointArray):
     pass
 
 
@@ -85,3 +102,14 @@ cdef class ListArray(Array):
 
 cdef class StringArray(Array):
     pass
+
+
+cdef class BinaryArray(Array):
+    pass
+
+
+cdef class DictionaryArray(Array):
+    pass
+
+
+cdef wrap_array_output(PyObject* output)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index 4299ba6..92206f2 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -33,12 +33,17 @@ from pyarrow.error cimport check_status
 cimport pyarrow.scalar as scalar
 from pyarrow.scalar import NA
 
-from pyarrow.schema cimport Field, Schema
+from pyarrow.schema cimport Field, Schema, DictionaryType
 import pyarrow.schema as schema
 
 cimport cpython
 
 
+cdef _pandas():
+    import pandas as pd
+    return pd
+
+
 def total_allocated_bytes():
     cdef MemoryPool* pool = pyarrow.get_memory_pool()
     return pool.bytes_allocated()
@@ -53,20 +58,22 @@ cdef class Array:
         self.type.init(self.sp_array.get().type())
 
     @staticmethod
-    def from_pandas(obj, mask=None):
+    def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None):
         """
-        Create an array from a pandas.Series
+        Convert pandas.Series to an Arrow Array.
 
         Parameters
         ----------
-        obj : pandas.Series or numpy.ndarray
-            vector holding the data
-        mask : numpy.ndarray, optional
+        series : pandas.Series or numpy.ndarray
+
+        mask : pandas.Series or numpy.ndarray, optional
             boolean mask if the object is valid or null
 
-        Returns
-        -------
-        pyarrow.Array
+        timestamps_to_ms : bool, optional
+            Convert datetime columns to ms resolution. This is needed for
+            compability with other functionality like Parquet I/O which
+            only supports milliseconds.
+
 
         Examples
         --------
@@ -80,16 +87,47 @@ cdef class Array:
           2
         ]
 
-
         >>> import numpy as np
-        >>> pa.Array.from_pandas(pd.Series([1, 2]), np.array([0, 1], dtype=bool))
+        >>> pa.Array.from_pandas(pd.Series([1, 2]), np.array([0, 1],
+        ... dtype=bool))
         <pyarrow.array.Int64Array object at 0x7f9019e11208>
         [
           1,
           NA
         ]
+
+        Returns
+        -------
+        pyarrow.array.Array
         """
-        return from_pandas_series(obj, mask)
+        cdef:
+            shared_ptr[CArray] out
+            shared_ptr[CField] c_field
+
+        pd = _pandas()
+
+        if field is not None:
+            c_field = field.sp_field
+
+        if mask is not None:
+            mask = get_series_values(mask)
+
+        series_values = get_series_values(obj)
+
+        if isinstance(series_values, pd.Categorical):
+            return DictionaryArray.from_arrays(series_values.codes,
+                                               series_values.categories.values,
+                                               mask=mask)
+        else:
+            if series_values.dtype.type == np.datetime64 and timestamps_to_ms:
+                series_values = series_values.astype('datetime64[ms]')
+
+            with nogil:
+                check_status(pyarrow.PandasToArrow(
+                    pyarrow.get_memory_pool(), series_values, mask,
+                    c_field, &out))
+
+            return box_arrow_array(out)
 
     @staticmethod
     def from_list(object list_obj, DataType type=None):
@@ -183,12 +221,13 @@ cdef class Array:
         RecordBatch.to_pandas
         """
         cdef:
-            PyObject* np_arr
-
-        check_status(pyarrow.ConvertArrayToPandas(
-            self.sp_array, <PyObject*> self, &np_arr))
+            PyObject* out
 
-        return PyObject_to_object(np_arr)
+        with nogil:
+            check_status(
+                pyarrow.ConvertArrayToPandas(self.sp_array, <PyObject*> self,
+                                             &out))
+        return wrap_array_output(out)
 
     def to_pylist(self):
         """
@@ -197,6 +236,17 @@ cdef class Array:
         return [x.as_py() for x in self]
 
 
+cdef wrap_array_output(PyObject* output):
+    cdef object obj = PyObject_to_object(output)
+
+    if isinstance(obj, dict):
+        return _pandas().Categorical(obj['indices'],
+                                     categories=obj['dictionary'],
+                                     fastpath=True)
+    else:
+        return obj
+
+
 cdef class NullArray(Array):
     pass
 
@@ -209,35 +259,43 @@ cdef class NumericArray(Array):
     pass
 
 
-cdef class Int8Array(NumericArray):
+cdef class IntegerArray(NumericArray):
+    pass
+
+
+cdef class FloatingPointArray(NumericArray):
+    pass
+
+
+cdef class Int8Array(IntegerArray):
     pass
 
 
-cdef class UInt8Array(NumericArray):
+cdef class UInt8Array(IntegerArray):
     pass
 
 
-cdef class Int16Array(NumericArray):
+cdef class Int16Array(IntegerArray):
     pass
 
 
-cdef class UInt16Array(NumericArray):
+cdef class UInt16Array(IntegerArray):
     pass
 
 
-cdef class Int32Array(NumericArray):
+cdef class Int32Array(IntegerArray):
     pass
 
 
-cdef class UInt32Array(NumericArray):
+cdef class UInt32Array(IntegerArray):
     pass
 
 
-cdef class Int64Array(NumericArray):
+cdef class Int64Array(IntegerArray):
     pass
 
 
-cdef class UInt64Array(NumericArray):
+cdef class UInt64Array(IntegerArray):
     pass
 
 
@@ -245,11 +303,11 @@ cdef class DateArray(NumericArray):
     pass
 
 
-cdef class FloatArray(NumericArray):
+cdef class FloatArray(FloatingPointArray):
     pass
 
 
-cdef class DoubleArray(NumericArray):
+cdef class DoubleArray(FloatingPointArray):
     pass
 
 
@@ -265,6 +323,46 @@ cdef class BinaryArray(Array):
     pass
 
 
+cdef class DictionaryArray(Array):
+
+    @staticmethod
+    def from_arrays(indices, dictionary, mask=None):
+        """
+        Construct Arrow DictionaryArray from array of indices (must be
+        non-negative integers) and corresponding array of dictionary values
+
+        Parameters
+        ----------
+        indices : ndarray or pandas.Series, integer type
+        dictionary : ndarray or pandas.Series
+        mask : ndarray or pandas.Series, boolean type
+            True values indicate that indices are actually null
+
+        Returns
+        -------
+        dict_array : DictionaryArray
+        """
+        cdef:
+            Array arrow_indices, arrow_dictionary
+            DictionaryArray result
+            shared_ptr[CDataType] c_type
+            shared_ptr[CArray] c_result
+
+        arrow_indices = Array.from_pandas(indices, mask=mask)
+        arrow_dictionary = Array.from_pandas(dictionary)
+
+        if not isinstance(arrow_indices, IntegerArray):
+            raise ValueError('Indices must be integer type')
+
+        c_type.reset(new CDictionaryType(arrow_indices.type.sp_type,
+                                         arrow_dictionary.sp_array))
+        c_result.reset(new CDictionaryArray(c_type, arrow_indices.sp_array))
+
+        result = DictionaryArray()
+        result.init(c_result)
+        return result
+
+
 cdef dict _array_classes = {
     Type_NA: NullArray,
     Type_BOOL: BooleanArray,
@@ -283,6 +381,7 @@ cdef dict _array_classes = {
     Type_BINARY: BinaryArray,
     Type_STRING: StringArray,
     Type_TIMESTAMP: Int64Array,
+    Type_DICTIONARY: DictionaryArray
 }
 
 cdef object box_arrow_array(const shared_ptr[CArray]& sp_array):
@@ -299,83 +398,18 @@ cdef object box_arrow_array(const shared_ptr[CArray]& sp_array):
     return arr
 
 
-def from_pylist(object list_obj, DataType type=None):
-    """
-    Convert Python list to Arrow array
-
-    Parameters
-    ----------
-    list_obj : array_like
-
-    Returns
-    -------
-    pyarrow.array.Array
-    """
-    cdef:
-        shared_ptr[CArray] sp_array
-
-    if type is None:
-        check_status(pyarrow.ConvertPySequence(list_obj, &sp_array))
-    else:
-        raise NotImplementedError()
-
-    return box_arrow_array(sp_array)
-
-
-def from_pandas_series(object series, object mask=None, timestamps_to_ms=False, Field field=None):
-    """
-    Convert pandas.Series to an Arrow Array.
-
-    Parameters
-    ----------
-    series : pandas.Series or numpy.ndarray
-
-    mask : pandas.Series or numpy.ndarray, optional
-        array to mask null entries in the series
-
-    timestamps_to_ms : bool, optional
-        Convert datetime columns to ms resolution. This is needed for
-        compability with other functionality like Parquet I/O which
-        only supports milliseconds.
-
-    field: pyarrow.Field
-        Schema indicator to what type this column should render in Arrow
-
-    Returns
-    -------
-    pyarrow.array.Array
-    """
-    cdef:
-        shared_ptr[CArray] out
-        shared_ptr[CField] c_field
-
-    series_values = series_as_ndarray(series)
-    if series_values.dtype.type == np.datetime64 and timestamps_to_ms:
-        series_values = series_values.astype('datetime64[ms]')
-    if field is not None:
-        c_field = field.sp_field
-
-    if mask is None:
-        with nogil:
-            check_status(pyarrow.PandasToArrow(pyarrow.get_memory_pool(),
-                                               series_values, c_field, &out))
-    else:
-        mask = series_as_ndarray(mask)
-        with nogil:
-            check_status(pyarrow.PandasMaskedToArrow(
-                pyarrow.get_memory_pool(), series_values, mask, c_field, &out))
-
-    return box_arrow_array(out)
-
-
-cdef object series_as_ndarray(object obj):
+cdef object get_series_values(object obj):
     import pandas as pd
 
     if isinstance(obj, pd.Series):
         result = obj.values
-    else:
+    elif isinstance(obj, np.ndarray):
         result = obj
+    else:
+        result = pd.Series(obj).values
 
     return result
 
+
 from_pylist = Array.from_list
+from_pandas_series = Array.from_pandas

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8b0e3b6..6284ad3 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -45,6 +45,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
         Type_LIST" arrow::Type::LIST"
         Type_STRUCT" arrow::Type::STRUCT"
+        Type_DICTIONARY" arrow::Type::DICTIONARY"
 
     enum TimeUnit" arrow::TimeUnit":
         TimeUnit_SECOND" arrow::TimeUnit::SECOND"
@@ -60,6 +61,33 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
         c_string ToString()
 
+    cdef cppclass CArray" arrow::Array":
+        shared_ptr[CDataType] type()
+
+        int32_t length()
+        int32_t null_count()
+        Type type_enum()
+
+        c_bool Equals(const shared_ptr[CArray]& arr)
+        c_bool IsNull(int i)
+
+    cdef cppclass CFixedWidthType" arrow::FixedWidthType"(CDataType):
+        int bit_width()
+
+    cdef cppclass CDictionaryArray" arrow::DictionaryArray"(CArray):
+        CDictionaryArray(const shared_ptr[CDataType]& type,
+                         const shared_ptr[CArray]& indices)
+
+        shared_ptr[CArray] indices()
+        shared_ptr[CArray] dictionary()
+
+    cdef cppclass CDictionaryType" arrow::DictionaryType"(CFixedWidthType):
+        CDictionaryType(const shared_ptr[CDataType]& index_type,
+                        const shared_ptr[CArray]& dictionary)
+
+        shared_ptr[CDataType] index_type()
+        shared_ptr[CArray] dictionary()
+
     shared_ptr[CDataType] timestamp(TimeUnit unit)
 
     cdef cppclass MemoryPool" arrow::MemoryPool":
@@ -111,16 +139,6 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         int num_fields()
         c_string ToString()
 
-    cdef cppclass CArray" arrow::Array":
-        shared_ptr[CDataType] type()
-
-        int32_t length()
-        int32_t null_count()
-        Type type_enum()
-
-        c_bool Equals(const shared_ptr[CArray]& arr)
-        c_bool IsNull(int i)
-
     cdef cppclass CBooleanArray" arrow::BooleanArray"(CArray):
         c_bool Value(int i)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/includes/pyarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/pyarrow.pxd b/python/pyarrow/includes/pyarrow.pxd
index b7b8d7c..04ad4f3 100644
--- a/python/pyarrow/includes/pyarrow.pxd
+++ b/python/pyarrow/includes/pyarrow.pxd
@@ -30,11 +30,9 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
     shared_ptr[CDataType] GetTimestampType(TimeUnit unit)
     CStatus ConvertPySequence(object obj, shared_ptr[CArray]* out)
 
-    CStatus PandasToArrow(MemoryPool* pool, object ao, shared_ptr[CField] field,
+    CStatus PandasToArrow(MemoryPool* pool, object ao, object mo,
+                          shared_ptr[CField] field,
                           shared_ptr[CArray]* out)
-    CStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
-                                shared_ptr[CField] field,
-                                shared_ptr[CArray]* out)
 
     CStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
                                  PyObject* py_ref, PyObject** out)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/schema.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pxd b/python/pyarrow/schema.pxd
index 42588d4..390954c 100644
--- a/python/pyarrow/schema.pxd
+++ b/python/pyarrow/schema.pxd
@@ -16,7 +16,8 @@
 # under the License.
 
 from pyarrow.includes.common cimport *
-from pyarrow.includes.libarrow cimport CDataType, CField, CSchema
+from pyarrow.includes.libarrow cimport (CDataType, CDictionaryType,
+                                        CField, CSchema)
 
 cdef class DataType:
     cdef:
@@ -25,6 +26,11 @@ cdef class DataType:
 
     cdef init(self, const shared_ptr[CDataType]& type)
 
+
+cdef class DictionaryType(DataType):
+    cdef:
+        const CDictionaryType* dict_type
+
 cdef class Field:
     cdef:
         shared_ptr[CField] sp_field

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/schema.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/schema.pyx b/python/pyarrow/schema.pyx
index 85b1617..2bcfec1 100644
--- a/python/pyarrow/schema.pyx
+++ b/python/pyarrow/schema.pyx
@@ -25,6 +25,7 @@
 from cython.operator cimport dereference as deref
 
 from pyarrow.compat import frombytes, tobytes
+from pyarrow.array cimport Array
 from pyarrow.includes.libarrow cimport (CDataType, CStructType, CListType,
                                         Type_NA, Type_BOOL,
                                         Type_UINT8, Type_INT8,
@@ -66,6 +67,19 @@ cdef class DataType:
             raise TypeError('Invalid comparison')
 
 
+cdef class DictionaryType(DataType):
+
+    cdef init(self, const shared_ptr[CDataType]& type):
+        DataType.init(self, type)
+        self.dict_type = <const CDictionaryType*> type.get()
+
+    def __str__(self):
+        return frombytes(self.type.ToString())
+
+    def __repr__(self):
+        return 'DictionaryType({0})'.format(str(self))
+
+
 cdef class Field:
 
     def __cinit__(self):
@@ -269,6 +283,7 @@ def binary():
     """
     return primitive_type(Type_BINARY)
 
+
 def list_(DataType value_type):
     cdef DataType out = DataType()
     cdef shared_ptr[CDataType] list_type
@@ -276,6 +291,19 @@ def list_(DataType value_type):
     out.init(list_type)
     return out
 
+
+def dictionary(DataType index_type, Array dictionary):
+    """
+    Dictionary (categorical, or simply encoded) type
+    """
+    cdef DictionaryType out = DictionaryType()
+    cdef shared_ptr[CDataType] dict_type
+    dict_type.reset(new CDictionaryType(index_type.sp_type,
+                                        dictionary.sp_array))
+    out.init(dict_type)
+    return out
+
+
 def struct(fields):
     """
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index b720a47..0e3b2bd 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -27,7 +27,7 @@ cimport pyarrow.includes.pyarrow as pyarrow
 
 import pyarrow.config
 
-from pyarrow.array cimport Array, box_arrow_array
+from pyarrow.array cimport Array, box_arrow_array, wrap_array_output
 from pyarrow.error import ArrowException
 from pyarrow.error cimport check_status
 from pyarrow.schema cimport box_data_type, box_schema, Field
@@ -39,6 +39,11 @@ cimport cpython
 from collections import OrderedDict
 
 
+cdef _pandas():
+    import pandas as pd
+    return pd
+
+
 cdef class ChunkedArray:
     """
     Array backed via one or more memory chunks.
@@ -146,14 +151,12 @@ cdef class Column:
         pandas.Series
         """
         cdef:
-            PyObject* arr
-
-        import pandas as pd
+            PyObject* out
 
         check_status(pyarrow.ConvertColumnToPandas(self.sp_column,
-                                                   <PyObject*> self, &arr))
+                                                   <PyObject*> self, &out))
 
-        return pd.Series(PyObject_to_object(arr), name=self.name)
+        return _pandas().Series(wrap_array_output(out), name=self.name)
 
     def equals(self, Column other):
         """
@@ -278,8 +281,6 @@ cdef _schema_from_arrays(arrays, names, shared_ptr[CSchema]* schema):
 
 
 cdef _dataframe_to_arrays(df, name, timestamps_to_ms, Schema schema):
-    from pyarrow.array import from_pandas_series
-
     cdef:
         list names = []
         list arrays = []
@@ -289,9 +290,8 @@ cdef _dataframe_to_arrays(df, name, timestamps_to_ms, Schema schema):
         col = df[name]
         if schema is not None:
             field = schema.field_by_name(name)
-        arr = from_pandas_series(col, timestamps_to_ms=timestamps_to_ms,
-                                 field=field)
-
+        arr = Array.from_pandas(col, timestamps_to_ms=timestamps_to_ms,
+                                field=field)
         names.append(name)
         arrays.append(arr)
 
@@ -304,7 +304,8 @@ cdef class RecordBatch:
 
     Warning
     -------
-    Do not call this class's constructor directly, use one of the ``from_*`` methods instead.
+    Do not call this class's constructor directly, use one of the ``from_*``
+    methods instead.
     """
 
     def __cinit__(self):
@@ -401,7 +402,7 @@ cdef class RecordBatch:
         return OrderedDict(entries)
 
 
-    def to_pandas(self):
+    def to_pandas(self, nthreads=None):
         """
         Convert the arrow::RecordBatch to a pandas DataFrame
 
@@ -409,23 +410,7 @@ cdef class RecordBatch:
         -------
         pandas.DataFrame
         """
-        cdef:
-            PyObject* np_arr
-            shared_ptr[CArray] arr
-            Column column
-
-        import pandas as pd
-
-        names = []
-        data = []
-        for i in range(self.batch.num_columns()):
-            arr = self.batch.column(i)
-            check_status(pyarrow.ConvertArrayToPandas(arr, <PyObject*> self,
-                                                      &np_arr))
-            names.append(frombytes(self.batch.column_name(i)))
-            data.append(PyObject_to_object(np_arr))
-
-        return pd.DataFrame(dict(zip(names, data)), columns=names)
+        return Table.from_batches([self]).to_pandas(nthreads=nthreads)
 
     @classmethod
     def from_pandas(cls, df, schema=None):
@@ -490,8 +475,8 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
         CColumn* col
         int i
 
-    from pandas.core.internals import BlockManager, make_block
-    from pandas import RangeIndex
+    import pandas.core.internals as _int
+    from pandas import RangeIndex, Categorical
 
     with nogil:
         check_status(pyarrow.ConvertTableToPandas(table, nthreads,
@@ -500,8 +485,19 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
     result = PyObject_to_object(result_obj)
 
     blocks = []
-    for block_arr, placement_arr in result:
-        blocks.append(make_block(block_arr, placement=placement_arr))
+    for item in result:
+        block_arr = item['block']
+        placement = item['placement']
+        if 'dictionary' in item:
+            cat = Categorical(block_arr,
+                              categories=item['dictionary'],
+                              ordered=False, fastpath=True)
+            block = _int.make_block(cat, placement=placement,
+                                    klass=_int.CategoricalBlock,
+                                    fastpath=True)
+        else:
+            block = _int.make_block(block_arr, placement=placement)
+        blocks.append(block)
 
     names = []
     for i in range(table.get().num_columns()):
@@ -509,7 +505,7 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
         names.append(frombytes(col.name()))
 
     axes = [names, RangeIndex(table.get().num_rows())]
-    return BlockManager(blocks, axes)
+    return _int.BlockManager(blocks, axes)
 
 
 cdef class Table:
@@ -518,7 +514,8 @@ cdef class Table:
 
     Warning
     -------
-    Do not call this class's constructor directly, use one of the ``from_*`` methods instead.
+    Do not call this class's constructor directly, use one of the ``from_*``
+    methods instead.
     """
 
     def __cinit__(self):
@@ -688,13 +685,11 @@ cdef class Table:
         -------
         pandas.DataFrame
         """
-        import pandas as pd
-
         if nthreads is None:
             nthreads = pyarrow.config.cpu_count()
 
         mgr = table_to_blockmanager(self.sp_table, nthreads)
-        return pd.DataFrame(mgr)
+        return _pandas().DataFrame(mgr)
 
     def to_pydict(self):
         """
@@ -835,6 +830,7 @@ cdef api object table_from_ctable(const shared_ptr[CTable]& ctable):
     table.init(ctable)
     return table
 
+
 cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch):
     cdef RecordBatch batch = RecordBatch()
     batch.init(cbatch)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/tests/test_column.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_column.py b/python/pyarrow/tests/test_column.py
index 32202cb..1a507c8 100644
--- a/python/pyarrow/tests/test_column.py
+++ b/python/pyarrow/tests/test_column.py
@@ -47,4 +47,3 @@ class TestColumn(unittest.TestCase):
         assert series.name == 'a'
         assert series.shape == (5,)
         assert series.iloc[0] == -10
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index 3928a1f..a2f5062 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -62,8 +62,10 @@ class TestPandasConversion(unittest.TestCase):
         pass
 
     def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
-                                timestamps_to_ms=False, expected_schema=None, schema=None):
-        table = A.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms, schema=schema)
+                                timestamps_to_ms=False, expected_schema=None,
+                                schema=None):
+        table = A.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms,
+                                    schema=schema)
         result = table.to_pandas(nthreads=nthreads)
         if expected_schema:
             assert table.schema.equals(expected_schema)
@@ -71,6 +73,13 @@ class TestPandasConversion(unittest.TestCase):
             expected = df
         tm.assert_frame_equal(result, expected)
 
+    def _check_array_roundtrip(self, values, expected=None,
+                                timestamps_to_ms=False, field=None):
+        arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
+                                  field=field)
+        result = arr.to_pandas()
+        tm.assert_series_equal(pd.Series(result), pd.Series(values))
+
     def test_float_no_nulls(self):
         data = {}
         fields = []
@@ -235,7 +244,8 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = A.Field.from_py('datetime64', A.timestamp('ms'))
         schema = A.Schema.from_fields([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=True, expected_schema=schema)
+        self._check_pandas_roundtrip(df, timestamps_to_ms=True,
+                                     expected_schema=schema)
 
         df = pd.DataFrame({
             'datetime64': np.array([
@@ -246,7 +256,8 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = A.Field.from_py('datetime64', A.timestamp('ns'))
         schema = A.Schema.from_fields([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=False, expected_schema=schema)
+        self._check_pandas_roundtrip(df, timestamps_to_ms=False,
+                                     expected_schema=schema)
 
     def test_timestamps_notimezone_nulls(self):
         df = pd.DataFrame({
@@ -258,7 +269,8 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = A.Field.from_py('datetime64', A.timestamp('ms'))
         schema = A.Schema.from_fields([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=True, expected_schema=schema)
+        self._check_pandas_roundtrip(df, timestamps_to_ms=True,
+                                     expected_schema=schema)
 
         df = pd.DataFrame({
             'datetime64': np.array([
@@ -269,7 +281,8 @@ class TestPandasConversion(unittest.TestCase):
             })
         field = A.Field.from_py('datetime64', A.timestamp('ns'))
         schema = A.Schema.from_fields([field])
-        self._check_pandas_roundtrip(df, timestamps_to_ms=False, expected_schema=schema)
+        self._check_pandas_roundtrip(df, timestamps_to_ms=False,
+                                     expected_schema=schema)
 
     def test_date(self):
         df = pd.DataFrame({
@@ -317,13 +330,13 @@ class TestPandasConversion(unittest.TestCase):
             np.array(['2007-07-13T01:23:34.123456789',
                       None,
                       '2010-08-13T05:46:57.437699912'],
-                      dtype='datetime64[ns]'),
+                     dtype='datetime64[ns]'),
             None,
             None,
             np.array(['2007-07-13T02',
                       None,
                       '2010-08-13T05:46:57.437699912'],
-                      dtype='datetime64[ns]'),
+                     dtype='datetime64[ns]'),
         ]
 
         df = pd.DataFrame(arrays)
@@ -331,16 +344,34 @@ class TestPandasConversion(unittest.TestCase):
         self._check_pandas_roundtrip(df, schema=schema, expected_schema=schema)
         table = A.Table.from_pandas(df, schema=schema)
         assert table.schema.equals(schema)
-        df_new = table.to_pandas(nthreads=1)
+
+        # it works!
+        table.to_pandas(nthreads=1)
 
     def test_threaded_conversion(self):
         df = _alltypes_example()
         self._check_pandas_roundtrip(df, nthreads=2,
                                      timestamps_to_ms=False)
 
-    # def test_category(self):
-    #     repeats = 1000
-    #     values = [b'foo', None, u'bar', 'qux', np.nan]
-    #     df = pd.DataFrame({'strings': values * repeats})
-    #     df['strings'] = df['strings'].astype('category')
-    #     self._check_pandas_roundtrip(df)
+    def test_category(self):
+        repeats = 5
+        v1 = ['foo', None, 'bar', 'qux', np.nan]
+        v2 = [4, 5, 6, 7, 8]
+        v3 = [b'foo', None, b'bar', b'qux', np.nan]
+        df = pd.DataFrame({'cat_strings': pd.Categorical(v1 * repeats),
+                           'cat_ints': pd.Categorical(v2 * repeats),
+                           'cat_binary': pd.Categorical(v3 * repeats),
+                           'ints': v2 * repeats,
+                           'ints2': v2 * repeats,
+                           'strings': v1 * repeats,
+                           'strings2': v1 * repeats,
+                           'strings3': v3 * repeats})
+        self._check_pandas_roundtrip(df)
+
+        arrays = [
+            pd.Categorical(v1 * repeats),
+            pd.Categorical(v2 * repeats),
+            pd.Categorical(v3 * repeats)
+        ]
+        for values in arrays:
+            self._check_array_roundtrip(values)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index 72ff584..de59a92 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -128,6 +128,7 @@ class build_ext(_build_ext):
 
         cmake_options = [
             '-DPYTHON_EXECUTABLE=%s' % sys.executable,
+            '-DPYARROW_BUILD_TESTS=off',
             static_lib_option,
             build_tests_option,
         ]

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/CMakeLists.txt b/python/src/pyarrow/CMakeLists.txt
index e20c323..9e69718 100644
--- a/python/src/pyarrow/CMakeLists.txt
+++ b/python/src/pyarrow/CMakeLists.txt
@@ -18,3 +18,5 @@
 #######################################
 # Unit tests
 #######################################
+
+ADD_PYARROW_TEST(adapters/pandas-test)

http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/adapters/pandas-test.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas-test.cc b/python/src/pyarrow/adapters/pandas-test.cc
new file mode 100644
index 0000000..e286ccc
--- /dev/null
+++ b/python/src/pyarrow/adapters/pandas-test.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/builder.h"
+#include "arrow/schema.h"
+#include "arrow/table.h"
+#include "arrow/test-util.h"
+#include "arrow/type.h"
+#include "pyarrow/adapters/pandas.h"
+
+using namespace arrow;
+
+namespace pyarrow {
+
+TEST(PandasConversionTest, TestObjectBlockWriteFails) {
+  StringBuilder builder;
+  const char value[] = {'\xf1', '\0'};
+
+  for (int i = 0; i < 1000; ++i) {
+    builder.Append(value, strlen(value));
+  }
+
+  std::shared_ptr<Array> arr;
+  ASSERT_OK(builder.Finish(&arr));
+
+  auto f1 = field("f1", utf8());
+  auto f2 = field("f2", utf8());
+  auto f3 = field("f3", utf8());
+  std::vector<std::shared_ptr<Field>> fields = {f1, f2, f3};
+  std::vector<std::shared_ptr<Column>> cols = {std::make_shared<Column>(f1, arr),
+      std::make_shared<Column>(f2, arr), std::make_shared<Column>(f3, arr)};
+
+  auto schema = std::make_shared<Schema>(fields);
+  auto table = std::make_shared<Table>("", schema, cols);
+
+  PyObject* out;
+  Py_BEGIN_ALLOW_THREADS;
+  ASSERT_RAISES(UnknownError, ConvertTableToPandas(table, 2, &out));
+  Py_END_ALLOW_THREADS;
+}
+
+}  // namespace arrow


[2/3] arrow git commit: ARROW-461: [Python] Add Python interfaces to DictionaryArray data, pandas interop

Posted by we...@apache.org.
http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 8c2d350..6623e23 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -49,6 +49,7 @@ namespace pyarrow {
 using arrow::Array;
 using arrow::ChunkedArray;
 using arrow::Column;
+using arrow::DictionaryType;
 using arrow::Field;
 using arrow::DataType;
 using arrow::ListType;
@@ -60,7 +61,7 @@ using arrow::Type;
 namespace BitUtil = arrow::BitUtil;
 
 // ----------------------------------------------------------------------
-// Serialization
+// Utility code
 
 template <int TYPE>
 struct npy_traits {};
@@ -242,1577 +243,1730 @@ Status AppendObjectStrings(arrow::StringBuilder& string_builder, PyObject** obje
 }
 
 template <int TYPE>
-class ArrowSerializer {
- public:
-  ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask)
-      : pool_(pool), arr_(arr), mask_(mask) {
-    length_ = PyArray_SIZE(arr_);
-  }
+struct arrow_traits {};
 
-  void IndicateType(const std::shared_ptr<Field> field) { field_indicator_ = field; }
+template <>
+struct arrow_traits<Type::BOOL> {
+  static constexpr int npy_type = NPY_BOOL;
+  static constexpr bool supports_nulls = false;
+  static constexpr bool is_boolean = true;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = false;
+};
 
-  Status Convert(std::shared_ptr<Array>* out);
+#define INT_DECL(TYPE)                                      \
+  template <>                                               \
+  struct arrow_traits<Type::TYPE> {                         \
+    static constexpr int npy_type = NPY_##TYPE;             \
+    static constexpr bool supports_nulls = false;           \
+    static constexpr double na_value = NAN;                 \
+    static constexpr bool is_boolean = false;               \
+    static constexpr bool is_numeric_not_nullable = true;   \
+    static constexpr bool is_numeric_nullable = false;      \
+    typedef typename npy_traits<NPY_##TYPE>::value_type T;  \
+  };
 
-  int stride() const { return PyArray_STRIDES(arr_)[0]; }
+INT_DECL(INT8);
+INT_DECL(INT16);
+INT_DECL(INT32);
+INT_DECL(INT64);
+INT_DECL(UINT8);
+INT_DECL(UINT16);
+INT_DECL(UINT32);
+INT_DECL(UINT64);
 
-  Status InitNullBitmap() {
-    int null_bytes = BitUtil::BytesForBits(length_);
+template <>
+struct arrow_traits<Type::FLOAT> {
+  static constexpr int npy_type = NPY_FLOAT32;
+  static constexpr bool supports_nulls = true;
+  static constexpr float na_value = NAN;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = true;
+  typedef typename npy_traits<NPY_FLOAT32>::value_type T;
+};
 
-    null_bitmap_ = std::make_shared<arrow::PoolBuffer>(pool_);
-    RETURN_NOT_OK(null_bitmap_->Resize(null_bytes));
+template <>
+struct arrow_traits<Type::DOUBLE> {
+  static constexpr int npy_type = NPY_FLOAT64;
+  static constexpr bool supports_nulls = true;
+  static constexpr double na_value = NAN;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = true;
+  typedef typename npy_traits<NPY_FLOAT64>::value_type T;
+};
 
-    null_bitmap_data_ = null_bitmap_->mutable_data();
-    memset(null_bitmap_data_, 0, null_bytes);
+static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min();
 
-    return Status::OK();
-  }
+template <>
+struct arrow_traits<Type::TIMESTAMP> {
+  static constexpr int npy_type = NPY_DATETIME;
+  static constexpr bool supports_nulls = true;
+  static constexpr int64_t na_value = kPandasTimestampNull;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = true;
+  typedef typename npy_traits<NPY_DATETIME>::value_type T;
+};
 
-  bool is_strided() const {
-    npy_intp* astrides = PyArray_STRIDES(arr_);
-    return astrides[0] != PyArray_DESCR(arr_)->elsize;
-  }
+template <>
+struct arrow_traits<Type::DATE> {
+  static constexpr int npy_type = NPY_DATETIME;
+  static constexpr bool supports_nulls = true;
+  static constexpr int64_t na_value = kPandasTimestampNull;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = true;
+  typedef typename npy_traits<NPY_DATETIME>::value_type T;
+};
 
- private:
-  Status ConvertData();
+template <>
+struct arrow_traits<Type::STRING> {
+  static constexpr int npy_type = NPY_OBJECT;
+  static constexpr bool supports_nulls = true;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = false;
+};
 
-  Status ConvertDates(std::shared_ptr<Array>* out) {
-    PyAcquireGIL lock;
+template <>
+struct arrow_traits<Type::BINARY> {
+  static constexpr int npy_type = NPY_OBJECT;
+  static constexpr bool supports_nulls = true;
+  static constexpr bool is_boolean = false;
+  static constexpr bool is_numeric_not_nullable = false;
+  static constexpr bool is_numeric_nullable = false;
+};
 
-    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-    arrow::TypePtr string_type(new arrow::DateType());
-    arrow::DateBuilder date_builder(pool_, string_type);
-    RETURN_NOT_OK(date_builder.Resize(length_));
+template <typename T>
+struct WrapBytes {};
 
-    Status s;
-    PyObject* obj;
-    for (int64_t i = 0; i < length_; ++i) {
-      obj = objects[i];
-      if (PyDate_CheckExact(obj)) {
-        PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj);
-        date_builder.Append(PyDate_to_ms(pydate));
-      } else {
-        date_builder.AppendNull();
-      }
-    }
-    return date_builder.Finish(out);
+template <>
+struct WrapBytes<arrow::StringArray> {
+  static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
+    return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length);
   }
+};
 
-  Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
-    PyAcquireGIL lock;
+template <>
+struct WrapBytes<arrow::BinaryArray> {
+  static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
+    return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length);
+  }
+};
 
-    // The output type at this point is inconclusive because there may be bytes
-    // and unicode mixed in the object array
+inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) {
+  if (type == NPY_DATETIME) {
+    PyArray_Descr* descr = PyArray_DESCR(out);
+    auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata);
+    if (datatype->type == Type::TIMESTAMP) {
+      auto timestamp_type = static_cast<arrow::TimestampType*>(datatype);
 
-    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-    arrow::TypePtr string_type(new arrow::StringType());
-    arrow::StringBuilder string_builder(pool_, string_type);
-    RETURN_NOT_OK(string_builder.Resize(length_));
+      switch (timestamp_type->unit) {
+        case arrow::TimestampType::Unit::SECOND:
+          date_dtype->meta.base = NPY_FR_s;
+          break;
+        case arrow::TimestampType::Unit::MILLI:
+          date_dtype->meta.base = NPY_FR_ms;
+          break;
+        case arrow::TimestampType::Unit::MICRO:
+          date_dtype->meta.base = NPY_FR_us;
+          break;
+        case arrow::TimestampType::Unit::NANO:
+          date_dtype->meta.base = NPY_FR_ns;
+          break;
+      }
+    } else {
+      // datatype->type == Type::DATE
+      date_dtype->meta.base = NPY_FR_D;
+    }
+  }
+}
 
-    Status s;
-    bool have_bytes = false;
-    RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes));
-    RETURN_NOT_OK(string_builder.Finish(out));
+template <typename T>
+inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    // Upcast to double, set NaN as appropriate
 
-    if (have_bytes) {
-      const auto& arr = static_cast<const arrow::StringArray&>(*out->get());
-      *out = std::make_shared<arrow::BinaryArray>(
-          arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap());
+    for (int i = 0; i < arr->length(); ++i) {
+      *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i];
     }
-    return Status::OK();
   }
+}
 
-  Status ConvertBooleans(std::shared_ptr<Array>* out) {
-    PyAcquireGIL lock;
+template <typename T>
+inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+    memcpy(out_values, in_values, sizeof(T) * arr->length());
+    out_values += arr->length();
+  }
+}
 
-    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
+template <typename InType, typename OutType>
+inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
+    for (int32_t i = 0; i < arr->length(); ++i) {
+      *out_values = in_values[i];
+    }
+  }
+}
 
-    int nbytes = BitUtil::BytesForBits(length_);
-    auto data = std::make_shared<arrow::PoolBuffer>(pool_);
-    RETURN_NOT_OK(data->Resize(nbytes));
-    uint8_t* bitmap = data->mutable_data();
-    memset(bitmap, 0, nbytes);
+static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) {
+  PyAcquireGIL lock;
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
 
-    int64_t null_count = 0;
-    for (int64_t i = 0; i < length_; ++i) {
-      if (objects[i] == Py_True) {
-        BitUtil::SetBit(bitmap, i);
-        BitUtil::SetBit(null_bitmap_data_, i);
-      } else if (objects[i] != Py_False) {
-        ++null_count;
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      if (bool_arr->IsNull(i)) {
+        Py_INCREF(Py_None);
+        *out_values++ = Py_None;
+      } else if (bool_arr->Value(i)) {
+        // True
+        Py_INCREF(Py_True);
+        *out_values++ = Py_True;
       } else {
-        BitUtil::SetBit(null_bitmap_data_, i);
+        // False
+        Py_INCREF(Py_False);
+        *out_values++ = Py_False;
       }
     }
-
-    *out = std::make_shared<arrow::BooleanArray>(length_, data, null_count, null_bitmap_);
-
-    return Status::OK();
   }
+  return Status::OK();
+}
 
-  template <int ITEM_TYPE, typename ArrowType>
-  Status ConvertTypedLists(
-      const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out);
-
-#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType)                            \
-  case Type::TYPE: {                                                      \
-    return ConvertTypedLists<NUMPY_TYPE, ::arrow::ArrowType>(field, out); \
+static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      *out_values++ = static_cast<uint8_t>(bool_arr->Value(i));
+    }
   }
+}
 
-  Status ConvertLists(const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
-    switch (field->type->type) {
-      LIST_CASE(UINT8, NPY_UINT8, UInt8Type)
-      LIST_CASE(INT8, NPY_INT8, Int8Type)
-      LIST_CASE(UINT16, NPY_UINT16, UInt16Type)
-      LIST_CASE(INT16, NPY_INT16, Int16Type)
-      LIST_CASE(UINT32, NPY_UINT32, UInt32Type)
-      LIST_CASE(INT32, NPY_INT32, Int32Type)
-      LIST_CASE(UINT64, NPY_UINT64, UInt64Type)
-      LIST_CASE(INT64, NPY_INT64, Int64Type)
-      LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType)
-      LIST_CASE(FLOAT, NPY_FLOAT, FloatType)
-      LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType)
-      LIST_CASE(STRING, NPY_OBJECT, StringType)
-      default:
-        return Status::TypeError("Unknown list item type");
-    }
-
-    return Status::TypeError("Unknown list type");
-  }
-
-  Status MakeDataType(std::shared_ptr<DataType>* out);
-
-  arrow::MemoryPool* pool_;
-
-  PyArrayObject* arr_;
-  PyArrayObject* mask_;
-
-  int64_t length_;
-
-  std::shared_ptr<Field> field_indicator_;
-  std::shared_ptr<arrow::Buffer> data_;
-  std::shared_ptr<arrow::ResizableBuffer> null_bitmap_;
-  uint8_t* null_bitmap_data_;
-};
+template <typename ArrayType>
+inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) {
+  PyAcquireGIL lock;
+  for (int c = 0; c < data.num_chunks(); c++) {
+    auto arr = static_cast<ArrayType*>(data.chunk(c).get());
 
-// Returns null count
-static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) {
-  int64_t null_count = 0;
-  const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask));
-  // TODO(wesm): strided null mask
-  for (int i = 0; i < length; ++i) {
-    if (mask_values[i]) {
-      ++null_count;
-    } else {
-      BitUtil::SetBit(bitmap, i);
+    const uint8_t* data_ptr;
+    int32_t length;
+    const bool has_nulls = data.null_count() > 0;
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      if (has_nulls && arr->IsNull(i)) {
+        Py_INCREF(Py_None);
+        *out_values = Py_None;
+      } else {
+        data_ptr = arr->GetValue(i, &length);
+        *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length);
+        if (*out_values == nullptr) {
+          PyErr_Clear();
+          std::stringstream ss;
+          ss << "Wrapping "
+             << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed";
+          return Status::UnknownError(ss.str());
+        }
+      }
+      ++out_values;
     }
   }
-  return null_count;
-}
-
-template <int TYPE>
-inline Status ArrowSerializer<TYPE>::MakeDataType(std::shared_ptr<DataType>* out) {
-  out->reset(new typename npy_traits<TYPE>::TypeClass());
   return Status::OK();
 }
 
-template <>
-inline Status ArrowSerializer<NPY_DATETIME>::MakeDataType(
-    std::shared_ptr<DataType>* out) {
-  PyArray_Descr* descr = PyArray_DESCR(arr_);
-  auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata);
-  arrow::TimestampType::Unit unit;
+template <typename ArrowType>
+inline Status ConvertListsLike(
+    const std::shared_ptr<Column>& col, PyObject** out_values) {
+  const ChunkedArray& data = *col->data().get();
+  auto list_type = std::static_pointer_cast<ListType>(col->type());
 
-  switch (date_dtype->meta.base) {
-    case NPY_FR_s:
-      unit = arrow::TimestampType::Unit::SECOND;
-      break;
-    case NPY_FR_ms:
-      unit = arrow::TimestampType::Unit::MILLI;
-      break;
-    case NPY_FR_us:
-      unit = arrow::TimestampType::Unit::MICRO;
-      break;
-    case NPY_FR_ns:
-      unit = arrow::TimestampType::Unit::NANO;
-      break;
-    default:
-      return Status::Invalid("Unknown NumPy datetime unit");
+  // Get column of underlying value arrays
+  std::vector<std::shared_ptr<Array>> value_arrays;
+  for (int c = 0; c < data.num_chunks(); c++) {
+    auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c));
+    value_arrays.emplace_back(arr->values());
   }
+  auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays);
+  // TODO(ARROW-489): Currently we don't have a Python reference for single columns.
+  //    Storing a reference to the whole Array would be to expensive.
+  PyObject* numpy_array;
+  RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array));
 
-  out->reset(new arrow::TimestampType(unit));
-  return Status::OK();
-}
-
-template <int TYPE>
-inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) {
-  typedef npy_traits<TYPE> traits;
+  PyAcquireGIL lock;
 
-  if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); }
+  for (int c = 0; c < data.num_chunks(); c++) {
+    auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c));
 
-  int64_t null_count = 0;
-  if (mask_ != nullptr) {
-    null_count = MaskToBitmap(mask_, length_, null_bitmap_data_);
-  } else if (traits::supports_nulls) {
-    null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_);
+    const uint8_t* data_ptr;
+    int32_t length;
+    const bool has_nulls = data.null_count() > 0;
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      if (has_nulls && arr->IsNull(i)) {
+        Py_INCREF(Py_None);
+        *out_values = Py_None;
+      } else {
+        PyObject* start = PyLong_FromLong(arr->value_offset(i));
+        PyObject* end = PyLong_FromLong(arr->value_offset(i + 1));
+        PyObject* slice = PySlice_New(start, end, NULL);
+        *out_values = PyObject_GetItem(numpy_array, slice);
+        Py_DECREF(start);
+        Py_DECREF(end);
+        Py_DECREF(slice);
+      }
+      ++out_values;
+    }
   }
 
-  RETURN_NOT_OK(ConvertData());
-  std::shared_ptr<DataType> type;
-  RETURN_NOT_OK(MakeDataType(&type));
-  RETURN_NOT_OK(MakePrimitiveArray(type, length_, data_, null_count, null_bitmap_, out));
+  Py_XDECREF(numpy_array);
   return Status::OK();
 }
 
-template <>
-inline Status ArrowSerializer<NPY_OBJECT>::Convert(std::shared_ptr<Array>* out) {
-  // Python object arrays are annoying, since we could have one of:
-  //
-  // * Strings
-  // * Booleans with nulls
-  // * Mixed type (not supported at the moment by arrow format)
-  //
-  // Additionally, nulls may be encoded either as np.nan or None. So we have to
-  // do some type inference and conversion
-
-  RETURN_NOT_OK(InitNullBitmap());
+template <typename T>
+inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
 
-  // TODO: mask not supported here
-  const PyObject** objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_));
-  {
-    PyAcquireGIL lock;
-    PyDateTime_IMPORT;
-  }
+    const uint8_t* valid_bits = arr->null_bitmap_data();
 
-  if (field_indicator_) {
-    switch (field_indicator_->type->type) {
-      case Type::STRING:
-        return ConvertObjectStrings(out);
-      case Type::BOOL:
-        return ConvertBooleans(out);
-      case Type::DATE:
-        return ConvertDates(out);
-      case Type::LIST: {
-        auto list_field = static_cast<ListType*>(field_indicator_->type.get());
-        return ConvertLists(list_field->value_field(), out);
-      }
-      default:
-        return Status::TypeError("No known conversion to Arrow type");
-    }
-  } else {
-    for (int64_t i = 0; i < length_; ++i) {
-      if (PyObject_is_null(objects[i])) {
-        continue;
-      } else if (PyObject_is_string(objects[i])) {
-        return ConvertObjectStrings(out);
-      } else if (PyBool_Check(objects[i])) {
-        return ConvertBooleans(out);
-      } else if (PyDate_CheckExact(objects[i])) {
-        return ConvertDates(out);
-      } else {
-        return Status::TypeError("unhandled python type");
+    if (arr->null_count() > 0) {
+      for (int64_t i = 0; i < arr->length(); ++i) {
+        *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i];
       }
+    } else {
+      memcpy(out_values, in_values, sizeof(T) * arr->length());
+      out_values += arr->length();
     }
   }
-
-  return Status::TypeError("Unable to infer type of object array, were all null");
 }
 
-template <int TYPE>
-inline Status ArrowSerializer<TYPE>::ConvertData() {
-  // TODO(wesm): strided arrays
-  if (is_strided()) { return Status::Invalid("no support for strided data yet"); }
+template <typename InType, typename OutType>
+inline void ConvertNumericNullableCast(
+    const ChunkedArray& data, OutType na_value, OutType* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
 
-  data_ = std::make_shared<NumPyBuffer>(arr_);
-  return Status::OK();
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]);
+    }
+  }
 }
 
-template <>
-inline Status ArrowSerializer<NPY_BOOL>::ConvertData() {
-  if (is_strided()) { return Status::Invalid("no support for strided data yet"); }
-
-  int nbytes = BitUtil::BytesForBits(length_);
-  auto buffer = std::make_shared<arrow::PoolBuffer>(pool_);
-  RETURN_NOT_OK(buffer->Resize(nbytes));
-
-  const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
-
-  uint8_t* bitmap = buffer->mutable_data();
+template <typename T>
+inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
 
-  memset(bitmap, 0, nbytes);
-  for (int i = 0; i < length_; ++i) {
-    if (values[i] > 0) { BitUtil::SetBit(bitmap, i); }
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      // There are 1000 * 60 * 60 * 24 = 86400000ms in a day
+      *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000;
+    }
   }
+}
 
-  data_ = buffer;
+template <typename InType, int SHIFT>
+inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) {
+  for (int c = 0; c < data.num_chunks(); c++) {
+    const std::shared_ptr<Array> arr = data.chunk(c);
+    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
+    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
 
-  return Status::OK();
+    for (int64_t i = 0; i < arr->length(); ++i) {
+      *out_values++ = arr->IsNull(i) ? kPandasTimestampNull
+                                     : (static_cast<int64_t>(in_values[i]) * SHIFT);
+    }
+  }
 }
 
-template <int TYPE>
-template <int ITEM_TYPE, typename ArrowType>
-inline Status ArrowSerializer<TYPE>::ConvertTypedLists(
-    const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
-  typedef npy_traits<ITEM_TYPE> traits;
-  typedef typename traits::value_type T;
-  typedef typename traits::BuilderClass BuilderT;
+// ----------------------------------------------------------------------
+// pandas 0.x DataFrame conversion internals
 
-  auto value_builder = std::make_shared<BuilderT>(pool_, field->type);
-  ListBuilder list_builder(pool_, value_builder);
-  PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-  for (int64_t i = 0; i < length_; ++i) {
-    if (PyObject_is_null(objects[i])) {
-      RETURN_NOT_OK(list_builder.AppendNull());
-    } else if (PyArray_Check(objects[i])) {
-      auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]);
-      RETURN_NOT_OK(list_builder.Append(true));
+class PandasBlock {
+ public:
+  enum type {
+    OBJECT,
+    UINT8,
+    INT8,
+    UINT16,
+    INT16,
+    UINT32,
+    INT32,
+    UINT64,
+    INT64,
+    FLOAT,
+    DOUBLE,
+    BOOL,
+    DATETIME,
+    CATEGORICAL
+  };
 
-      // TODO(uwe): Support more complex numpy array structures
-      RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE));
+  PandasBlock(int64_t num_rows, int num_columns)
+      : num_rows_(num_rows), num_columns_(num_columns) {}
+  virtual ~PandasBlock() {}
 
-      int32_t size = PyArray_DIM(numpy_array, 0);
-      auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_array));
-      if (traits::supports_nulls) {
-        null_bitmap_->Resize(size, false);
-        // TODO(uwe): A bitmap would be more space-efficient but the Builder API doesn't
-        // currently support this.
-        // ValuesToBitmap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data());
-        ValuesToBytemap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data());
-        RETURN_NOT_OK(value_builder->Append(data, size, null_bitmap_->data()));
-      } else {
-        RETURN_NOT_OK(value_builder->Append(data, size));
-      }
-    } else if (PyList_Check(objects[i])) {
-      return Status::TypeError("Python lists are not yet supported");
-    } else {
-      return Status::TypeError("Unsupported Python type for list items");
-    }
-  }
-  return list_builder.Finish(out);
-}
+  virtual Status Allocate() = 0;
+  virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) = 0;
 
-template <>
-template <>
-inline Status
-ArrowSerializer<NPY_OBJECT>::ConvertTypedLists<NPY_OBJECT, ::arrow::StringType>(
-    const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
-  // TODO: If there are bytes involed, convert to Binary representation
-  bool have_bytes = false;
+  PyObject* block_arr() const { return block_arr_.obj(); }
 
-  auto value_builder = std::make_shared<arrow::StringBuilder>(pool_, field->type);
-  ListBuilder list_builder(pool_, value_builder);
-  PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
-  for (int64_t i = 0; i < length_; ++i) {
-    if (PyObject_is_null(objects[i])) {
-      RETURN_NOT_OK(list_builder.AppendNull());
-    } else if (PyArray_Check(objects[i])) {
-      auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]);
-      RETURN_NOT_OK(list_builder.Append(true));
+  virtual Status GetPyResult(PyObject** output) {
+    PyObject* result = PyDict_New();
+    RETURN_IF_PYERROR();
 
-      // TODO(uwe): Support more complex numpy array structures
-      RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, NPY_OBJECT));
+    PyDict_SetItemString(result, "block", block_arr_.obj());
+    PyDict_SetItemString(result, "placement", placement_arr_.obj());
 
-      int32_t size = PyArray_DIM(numpy_array, 0);
-      auto data = reinterpret_cast<PyObject**>(PyArray_DATA(numpy_array));
-      RETURN_NOT_OK(AppendObjectStrings(*value_builder.get(), data, size, &have_bytes));
-    } else if (PyList_Check(objects[i])) {
-      return Status::TypeError("Python lists are not yet supported");
+    *output = result;
+
+    return Status::OK();
+  }
+
+ protected:
+  Status AllocateNDArray(int npy_type, int ndim = 2) {
+    PyAcquireGIL lock;
+
+    PyObject* block_arr;
+    if (ndim == 2) {
+      npy_intp block_dims[2] = {num_columns_, num_rows_};
+      block_arr = PyArray_SimpleNew(2, block_dims, npy_type);
     } else {
-      return Status::TypeError("Unsupported Python type for list items");
+      npy_intp block_dims[1] = {num_rows_};
+      block_arr = PyArray_SimpleNew(1, block_dims, npy_type);
     }
-  }
-  return list_builder.Finish(out);
-}
 
-template <>
-inline Status ArrowSerializer<NPY_OBJECT>::ConvertData() {
-  return Status::TypeError("NYI");
-}
+    if (block_arr == NULL) {
+      // TODO(wesm): propagating Python exception
+      return Status::OK();
+    }
 
-#define TO_ARROW_CASE(TYPE)                                 \
-  case NPY_##TYPE: {                                        \
-    ArrowSerializer<NPY_##TYPE> converter(pool, arr, mask); \
-    RETURN_NOT_OK(converter.Convert(out));                  \
-  } break;
+    npy_intp placement_dims[1] = {num_columns_};
+    PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);
+    if (placement_arr == NULL) {
+      // TODO(wesm): propagating Python exception
+      return Status::OK();
+    }
 
-Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo,
-    const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
-  PyArrayObject* arr = reinterpret_cast<PyArrayObject*>(ao);
-  PyArrayObject* mask = nullptr;
+    block_arr_.reset(block_arr);
+    placement_arr_.reset(placement_arr);
 
-  if (mo != nullptr) { mask = reinterpret_cast<PyArrayObject*>(mo); }
+    block_data_ = reinterpret_cast<uint8_t*>(
+        PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr)));
 
-  if (PyArray_NDIM(arr) != 1) {
-    return Status::Invalid("only handle 1-dimensional arrays");
+    placement_data_ = reinterpret_cast<int64_t*>(
+        PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));
+
+    return Status::OK();
   }
 
-  switch (PyArray_DESCR(arr)->type_num) {
-    TO_ARROW_CASE(BOOL);
-    TO_ARROW_CASE(INT8);
-    TO_ARROW_CASE(INT16);
-    TO_ARROW_CASE(INT32);
-    TO_ARROW_CASE(INT64);
-    TO_ARROW_CASE(UINT8);
-    TO_ARROW_CASE(UINT16);
-    TO_ARROW_CASE(UINT32);
-    TO_ARROW_CASE(UINT64);
-    TO_ARROW_CASE(FLOAT32);
-    TO_ARROW_CASE(FLOAT64);
-    TO_ARROW_CASE(DATETIME);
-    case NPY_OBJECT: {
-      ArrowSerializer<NPY_OBJECT> converter(pool, arr, mask);
-      converter.IndicateType(field);
-      RETURN_NOT_OK(converter.Convert(out));
-    } break;
-    default:
+  int64_t num_rows_;
+  int num_columns_;
+
+  OwnedRef block_arr_;
+  uint8_t* block_data_;
+
+  // ndarray<int32>
+  OwnedRef placement_arr_;
+  int64_t* placement_data_;
+
+  DISALLOW_COPY_AND_ASSIGN(PandasBlock);
+};
+
+#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum)                         \
+  case Type::ArrowEnum:                                                     \
+    RETURN_NOT_OK((ConvertListsLike<::arrow::ArrowType>(col, out_buffer))); \
+    break;
+
+class ObjectBlock : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
+  virtual ~ObjectBlock() {}
+
+  Status Allocate() override { return AllocateNDArray(NPY_OBJECT); }
+
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
+
+    PyObject** out_buffer =
+        reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_;
+
+    const ChunkedArray& data = *col->data().get();
+
+    if (type == Type::BOOL) {
+      RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer));
+    } else if (type == Type::BINARY) {
+      RETURN_NOT_OK(ConvertBinaryLike<arrow::BinaryArray>(data, out_buffer));
+    } else if (type == Type::STRING) {
+      RETURN_NOT_OK(ConvertBinaryLike<arrow::StringArray>(data, out_buffer));
+    } else if (type == Type::LIST) {
+      auto list_type = std::static_pointer_cast<ListType>(col->type());
+      switch (list_type->value_type()->type) {
+        CONVERTLISTSLIKE_CASE(UInt8Type, UINT8)
+        CONVERTLISTSLIKE_CASE(Int8Type, INT8)
+        CONVERTLISTSLIKE_CASE(UInt16Type, UINT16)
+        CONVERTLISTSLIKE_CASE(Int16Type, INT16)
+        CONVERTLISTSLIKE_CASE(UInt32Type, UINT32)
+        CONVERTLISTSLIKE_CASE(Int32Type, INT32)
+        CONVERTLISTSLIKE_CASE(UInt64Type, UINT64)
+        CONVERTLISTSLIKE_CASE(Int64Type, INT64)
+        CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP)
+        CONVERTLISTSLIKE_CASE(FloatType, FLOAT)
+        CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE)
+        CONVERTLISTSLIKE_CASE(StringType, STRING)
+        default: {
+          std::stringstream ss;
+          ss << "Not implemented type for lists: " << list_type->value_type()->ToString();
+          return Status::NotImplemented(ss.str());
+        }
+      }
+    } else {
       std::stringstream ss;
-      ss << "unsupported type " << PyArray_DESCR(arr)->type_num << std::endl;
+      ss << "Unsupported type for object array output: " << col->type()->ToString();
       return Status::NotImplemented(ss.str());
+    }
+
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
   }
-  return Status::OK();
-}
+};
 
-Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao,
-    const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
-  return PandasMaskedToArrow(pool, ao, nullptr, field, out);
-}
+template <int ARROW_TYPE, typename C_TYPE>
+class IntBlock : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
 
-// ----------------------------------------------------------------------
-// Deserialization
+  Status Allocate() override {
+    return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type);
+  }
 
-template <int TYPE>
-struct arrow_traits {};
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
 
-template <>
-struct arrow_traits<arrow::Type::BOOL> {
-  static constexpr int npy_type = NPY_BOOL;
-  static constexpr bool supports_nulls = false;
-  static constexpr bool is_boolean = true;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = false;
-};
+    C_TYPE* out_buffer =
+        reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_;
 
-#define INT_DECL(TYPE)                                     \
-  template <>                                              \
-  struct arrow_traits<arrow::Type::TYPE> {                 \
-    static constexpr int npy_type = NPY_##TYPE;            \
-    static constexpr bool supports_nulls = false;          \
-    static constexpr double na_value = NAN;                \
-    static constexpr bool is_boolean = false;              \
-    static constexpr bool is_numeric_not_nullable = true;  \
-    static constexpr bool is_numeric_nullable = false;     \
-    typedef typename npy_traits<NPY_##TYPE>::value_type T; \
-  };
+    const ChunkedArray& data = *col->data().get();
 
-INT_DECL(INT8);
-INT_DECL(INT16);
-INT_DECL(INT32);
-INT_DECL(INT64);
-INT_DECL(UINT8);
-INT_DECL(UINT16);
-INT_DECL(UINT32);
-INT_DECL(UINT64);
+    if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); }
 
-template <>
-struct arrow_traits<arrow::Type::FLOAT> {
-  static constexpr int npy_type = NPY_FLOAT32;
-  static constexpr bool supports_nulls = true;
-  static constexpr float na_value = NAN;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = true;
-  typedef typename npy_traits<NPY_FLOAT32>::value_type T;
+    ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer);
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
+  }
 };
 
-template <>
-struct arrow_traits<arrow::Type::DOUBLE> {
-  static constexpr int npy_type = NPY_FLOAT64;
-  static constexpr bool supports_nulls = true;
-  static constexpr double na_value = NAN;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = true;
-  typedef typename npy_traits<NPY_FLOAT64>::value_type T;
-};
+using UInt8Block = IntBlock<Type::UINT8, uint8_t>;
+using Int8Block = IntBlock<Type::INT8, int8_t>;
+using UInt16Block = IntBlock<Type::UINT16, uint16_t>;
+using Int16Block = IntBlock<Type::INT16, int16_t>;
+using UInt32Block = IntBlock<Type::UINT32, uint32_t>;
+using Int32Block = IntBlock<Type::INT32, int32_t>;
+using UInt64Block = IntBlock<Type::UINT64, uint64_t>;
+using Int64Block = IntBlock<Type::INT64, int64_t>;
 
-static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min();
+class Float32Block : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
 
-template <>
-struct arrow_traits<arrow::Type::TIMESTAMP> {
-  static constexpr int npy_type = NPY_DATETIME;
-  static constexpr bool supports_nulls = true;
-  static constexpr int64_t na_value = kPandasTimestampNull;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = true;
-  typedef typename npy_traits<NPY_DATETIME>::value_type T;
-};
+  Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); }
 
-template <>
-struct arrow_traits<arrow::Type::DATE> {
-  static constexpr int npy_type = NPY_DATETIME;
-  static constexpr bool supports_nulls = true;
-  static constexpr int64_t na_value = kPandasTimestampNull;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = true;
-  typedef typename npy_traits<NPY_DATETIME>::value_type T;
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
+
+    if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); }
+
+    float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_;
+
+    ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer);
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
+  }
 };
 
-template <>
-struct arrow_traits<arrow::Type::STRING> {
-  static constexpr int npy_type = NPY_OBJECT;
-  static constexpr bool supports_nulls = true;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = false;
-};
+class Float64Block : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
+
+  Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); }
+
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
+
+    double* out_buffer =
+        reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_;
+
+    const ChunkedArray& data = *col->data().get();
+
+#define INTEGER_CASE(IN_TYPE)                         \
+  ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \
+  break;
+
+    switch (type) {
+      case Type::UINT8:
+        INTEGER_CASE(uint8_t);
+      case Type::INT8:
+        INTEGER_CASE(int8_t);
+      case Type::UINT16:
+        INTEGER_CASE(uint16_t);
+      case Type::INT16:
+        INTEGER_CASE(int16_t);
+      case Type::UINT32:
+        INTEGER_CASE(uint32_t);
+      case Type::INT32:
+        INTEGER_CASE(int32_t);
+      case Type::UINT64:
+        INTEGER_CASE(uint64_t);
+      case Type::INT64:
+        INTEGER_CASE(int64_t);
+      case Type::FLOAT:
+        ConvertNumericNullableCast<float, double>(data, NAN, out_buffer);
+        break;
+      case Type::DOUBLE:
+        ConvertNumericNullable<double>(data, NAN, out_buffer);
+        break;
+      default:
+        return Status::NotImplemented(col->type()->ToString());
+    }
+
+#undef INTEGER_CASE
 
-template <>
-struct arrow_traits<arrow::Type::BINARY> {
-  static constexpr int npy_type = NPY_OBJECT;
-  static constexpr bool supports_nulls = true;
-  static constexpr bool is_boolean = false;
-  static constexpr bool is_numeric_not_nullable = false;
-  static constexpr bool is_numeric_nullable = false;
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
+  }
 };
 
-template <typename T>
-struct WrapBytes {};
+class BoolBlock : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
 
-template <>
-struct WrapBytes<arrow::StringArray> {
-  static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
-    return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length);
-  }
-};
+  Status Allocate() override { return AllocateNDArray(NPY_BOOL); }
 
-template <>
-struct WrapBytes<arrow::BinaryArray> {
-  static inline PyObject* Wrap(const uint8_t* data, int64_t length) {
-    return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length);
-  }
-};
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
 
-inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) {
-  if (type == NPY_DATETIME) {
-    PyArray_Descr* descr = PyArray_DESCR(out);
-    auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata);
-    if (datatype->type == arrow::Type::TIMESTAMP) {
-      auto timestamp_type = static_cast<arrow::TimestampType*>(datatype);
+    if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); }
 
-      switch (timestamp_type->unit) {
-        case arrow::TimestampType::Unit::SECOND:
-          date_dtype->meta.base = NPY_FR_s;
-          break;
-        case arrow::TimestampType::Unit::MILLI:
-          date_dtype->meta.base = NPY_FR_ms;
-          break;
-        case arrow::TimestampType::Unit::MICRO:
-          date_dtype->meta.base = NPY_FR_us;
-          break;
-        case arrow::TimestampType::Unit::NANO:
-          date_dtype->meta.base = NPY_FR_ns;
-          break;
-      }
-    } else {
-      // datatype->type == arrow::Type::DATE
-      date_dtype->meta.base = NPY_FR_D;
-    }
+    uint8_t* out_buffer =
+        reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_;
+
+    ConvertBooleanNoNulls(*col->data().get(), out_buffer);
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
   }
-}
+};
 
-template <typename T>
-inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
-    // Upcast to double, set NaN as appropriate
+class DatetimeBlock : public PandasBlock {
+ public:
+  using PandasBlock::PandasBlock;
 
-    for (int i = 0; i < arr->length(); ++i) {
-      *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i];
-    }
-  }
-}
+  Status Allocate() override {
+    RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME));
 
-template <typename T>
-inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
-    memcpy(out_values, in_values, sizeof(T) * arr->length());
-    out_values += arr->length();
+    PyAcquireGIL lock;
+    auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(
+        PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))->c_metadata);
+    date_dtype->meta.base = NPY_FR_ns;
+    return Status::OK();
   }
-}
 
-template <typename InType, typename OutType>
-inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
-    for (int32_t i = 0; i < arr->length(); ++i) {
-      *out_values = in_values[i];
-    }
-  }
-}
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    Type::type type = col->type()->type;
 
-static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) {
-  PyAcquireGIL lock;
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
+    int64_t* out_buffer =
+        reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_;
 
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      if (bool_arr->IsNull(i)) {
-        Py_INCREF(Py_None);
-        *out_values++ = Py_None;
-      } else if (bool_arr->Value(i)) {
-        // True
-        Py_INCREF(Py_True);
-        *out_values++ = Py_True;
+    const ChunkedArray& data = *col.get()->data();
+
+    if (type == Type::DATE) {
+      // DateType is millisecond timestamp stored as int64_t
+      // TODO(wesm): Do we want to make sure to zero out the milliseconds?
+      ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer);
+    } else if (type == Type::TIMESTAMP) {
+      auto ts_type = static_cast<arrow::TimestampType*>(col->type().get());
+
+      if (ts_type->unit == arrow::TimeUnit::NANO) {
+        ConvertNumericNullable<int64_t>(data, kPandasTimestampNull, out_buffer);
+      } else if (ts_type->unit == arrow::TimeUnit::MICRO) {
+        ConvertDatetimeNanos<int64_t, 1000L>(data, out_buffer);
+      } else if (ts_type->unit == arrow::TimeUnit::MILLI) {
+        ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer);
+      } else if (ts_type->unit == arrow::TimeUnit::SECOND) {
+        ConvertDatetimeNanos<int64_t, 1000000000L>(data, out_buffer);
       } else {
-        // False
-        Py_INCREF(Py_False);
-        *out_values++ = Py_False;
+        return Status::NotImplemented("Unsupported time unit");
       }
+    } else {
+      return Status::NotImplemented(col->type()->ToString());
     }
+
+    placement_data_[rel_placement] = abs_placement;
+    return Status::OK();
   }
-  return Status::OK();
-}
+};
 
-static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get());
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      *out_values++ = static_cast<uint8_t>(bool_arr->Value(i));
+template <int ARROW_INDEX_TYPE>
+class CategoricalBlock : public PandasBlock {
+ public:
+  CategoricalBlock(int64_t num_rows) : PandasBlock(num_rows, 1) {}
+
+  Status Allocate() override {
+    constexpr int npy_type = arrow_traits<ARROW_INDEX_TYPE>::npy_type;
+
+    if (!(npy_type == NPY_INT8 || npy_type == NPY_INT16 || npy_type == NPY_INT32 ||
+            npy_type == NPY_INT64)) {
+      return Status::Invalid("Category indices must be signed integers");
     }
+    return AllocateNDArray(npy_type, 1);
   }
-}
 
-template <typename ArrayType>
-inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) {
-  PyAcquireGIL lock;
-  for (int c = 0; c < data.num_chunks(); c++) {
-    auto arr = static_cast<ArrayType*>(data.chunk(c).get());
+  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+      int64_t rel_placement) override {
+    using T = typename arrow_traits<ARROW_INDEX_TYPE>::T;
 
-    const uint8_t* data_ptr;
-    int32_t length;
-    const bool has_nulls = data.null_count() > 0;
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      if (has_nulls && arr->IsNull(i)) {
-        Py_INCREF(Py_None);
-        *out_values = Py_None;
-      } else {
-        data_ptr = arr->GetValue(i, &length);
-        *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length);
-        if (*out_values == nullptr) {
-          return Status::UnknownError("String initialization failed");
-        }
+    T* out_values = reinterpret_cast<T*>(block_data_) + rel_placement * num_rows_;
+
+    const ChunkedArray& data = *col->data().get();
+
+    for (int c = 0; c < data.num_chunks(); c++) {
+      const std::shared_ptr<Array> arr = data.chunk(c);
+      const auto& dict_arr = static_cast<const arrow::DictionaryArray&>(*arr);
+      const auto& indices =
+          static_cast<const arrow::PrimitiveArray&>(*dict_arr.indices());
+      auto in_values = reinterpret_cast<const T*>(indices.data()->data());
+
+      // Null is -1 in CategoricalBlock
+      for (int i = 0; i < arr->length(); ++i) {
+        *out_values++ = indices.IsNull(i) ? -1 : in_values[i];
       }
-      ++out_values;
     }
-  }
-  return Status::OK();
-}
 
-template <typename ArrowType>
-inline Status ConvertListsLike(
-    const std::shared_ptr<Column>& col, PyObject** out_values) {
-  typedef arrow_traits<ArrowType::type_id> traits;
-  typedef typename ::arrow::TypeTraits<ArrowType>::ArrayType ArrayType;
+    placement_data_[rel_placement] = abs_placement;
 
-  const ChunkedArray& data = *col->data().get();
-  auto list_type = std::static_pointer_cast<ListType>(col->type());
+    auto dict_type = static_cast<const DictionaryType*>(col->type().get());
 
-  // Get column of underlying value arrays
-  std::vector<std::shared_ptr<Array>> value_arrays;
-  for (int c = 0; c < data.num_chunks(); c++) {
-    auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c));
-    value_arrays.emplace_back(arr->values());
+    PyObject* dict;
+    RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, &dict));
+    dictionary_.reset(dict);
+
+    return Status::OK();
   }
-  auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays);
-  // TODO(ARROW-489): Currently we don't have a Python reference for single columns.
-  //    Storing a reference to the whole Array would be to expensive.
-  PyObject* numpy_array;
-  RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array));
 
-  PyAcquireGIL lock;
+  Status GetPyResult(PyObject** output) override {
+    PyObject* result = PyDict_New();
+    RETURN_IF_PYERROR();
 
-  for (int c = 0; c < data.num_chunks(); c++) {
-    auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c));
+    PyDict_SetItemString(result, "block", block_arr_.obj());
+    PyDict_SetItemString(result, "dictionary", dictionary_.obj());
+    PyDict_SetItemString(result, "placement", placement_arr_.obj());
 
-    const uint8_t* data_ptr;
-    int32_t length;
-    const bool has_nulls = data.null_count() > 0;
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      if (has_nulls && arr->IsNull(i)) {
-        Py_INCREF(Py_None);
-        *out_values = Py_None;
-      } else {
-        PyObject* start = PyLong_FromLong(arr->value_offset(i));
-        PyObject* end = PyLong_FromLong(arr->value_offset(i + 1));
-        PyObject* slice = PySlice_New(start, end, NULL);
-        *out_values = PyObject_GetItem(numpy_array, slice);
-        Py_DECREF(start);
-        Py_DECREF(end);
-        Py_DECREF(slice);
-      }
-      ++out_values;
-    }
-  }
+    *output = result;
 
-  Py_XDECREF(numpy_array);
-  return Status::OK();
-}
+    return Status::OK();
+  }
 
-template <typename T>
-inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+ protected:
+  OwnedRef dictionary_;
+};
 
-    const uint8_t* valid_bits = arr->null_bitmap_data();
+Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns,
+    std::shared_ptr<PandasBlock>* block) {
+#define BLOCK_CASE(NAME, TYPE)                              \
+  case PandasBlock::NAME:                                   \
+    *block = std::make_shared<TYPE>(num_rows, num_columns); \
+    break;
 
-    if (arr->null_count() > 0) {
-      for (int64_t i = 0; i < arr->length(); ++i) {
-        *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i];
-      }
-    } else {
-      memcpy(out_values, in_values, sizeof(T) * arr->length());
-      out_values += arr->length();
-    }
+  switch (type) {
+    BLOCK_CASE(OBJECT, ObjectBlock);
+    BLOCK_CASE(UINT8, UInt8Block);
+    BLOCK_CASE(INT8, Int8Block);
+    BLOCK_CASE(UINT16, UInt16Block);
+    BLOCK_CASE(INT16, Int16Block);
+    BLOCK_CASE(UINT32, UInt32Block);
+    BLOCK_CASE(INT32, Int32Block);
+    BLOCK_CASE(UINT64, UInt64Block);
+    BLOCK_CASE(INT64, Int64Block);
+    BLOCK_CASE(FLOAT, Float32Block);
+    BLOCK_CASE(DOUBLE, Float64Block);
+    BLOCK_CASE(BOOL, BoolBlock);
+    BLOCK_CASE(DATETIME, DatetimeBlock);
+    default:
+      return Status::NotImplemented("Unsupported block type");
   }
-}
 
-template <typename InType, typename OutType>
-inline void ConvertNumericNullableCast(
-    const ChunkedArray& data, OutType na_value, OutType* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
+#undef BLOCK_CASE
 
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]);
-    }
-  }
+  return (*block)->Allocate();
 }
 
-template <typename T>
-inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
-
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      // There are 1000 * 60 * 60 * 24 = 86400000ms in a day
-      *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000;
-    }
+static inline bool ListTypeSupported(const Type::type type_id) {
+  switch (type_id) {
+    case Type::UINT8:
+    case Type::INT8:
+    case Type::UINT16:
+    case Type::INT16:
+    case Type::UINT32:
+    case Type::INT32:
+    case Type::INT64:
+    case Type::UINT64:
+    case Type::FLOAT:
+    case Type::DOUBLE:
+    case Type::STRING:
+    case Type::TIMESTAMP:
+      // The above types are all supported.
+      return true;
+    default:
+      break;
   }
+  return false;
 }
 
-template <typename InType, int SHIFT>
-inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) {
-  for (int c = 0; c < data.num_chunks(); c++) {
-    const std::shared_ptr<Array> arr = data.chunk(c);
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data());
-
-    for (int64_t i = 0; i < arr->length(); ++i) {
-      *out_values++ = arr->IsNull(i) ? kPandasTimestampNull
-                                     : (static_cast<int64_t>(in_values[i]) * SHIFT);
+static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& type,
+    int64_t num_rows, std::shared_ptr<PandasBlock>* block) {
+  // All categoricals become a block with a single column
+  auto dict_type = static_cast<const DictionaryType*>(type.get());
+  switch (dict_type->index_type()->type) {
+    case Type::INT8:
+      *block = std::make_shared<CategoricalBlock<Type::INT8>>(num_rows);
+      break;
+    case Type::INT16:
+      *block = std::make_shared<CategoricalBlock<Type::INT16>>(num_rows);
+      break;
+    case Type::INT32:
+      *block = std::make_shared<CategoricalBlock<Type::INT32>>(num_rows);
+      break;
+    case Type::INT64:
+      *block = std::make_shared<CategoricalBlock<Type::INT64>>(num_rows);
+      break;
+    default: {
+      std::stringstream ss;
+      ss << "Categorical index type not implemented: "
+         << dict_type->index_type()->ToString();
+      return Status::NotImplemented(ss.str());
     }
   }
+  return (*block)->Allocate();
 }
 
-class ArrowDeserializer {
+// Construct the exact pandas 0.x "BlockManager" memory layout
+//
+// * For each column determine the correct output pandas type
+// * Allocate 2D blocks (ncols x nrows) for each distinct data type in output
+// * Allocate  block placement arrays
+// * Write Arrow columns out into each slice of memory; populate block
+// * placement arrays as we go
+class DataFrameBlockCreator {
  public:
-  ArrowDeserializer(const std::shared_ptr<Column>& col, PyObject* py_ref)
-      : col_(col), data_(*col->data().get()), py_ref_(py_ref) {}
-
-  Status AllocateOutput(int type) {
-    PyAcquireGIL lock;
-
-    npy_intp dims[1] = {col_->length()};
-    out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNew(1, dims, type));
+  DataFrameBlockCreator(const std::shared_ptr<Table>& table) : table_(table) {}
 
-    if (out_ == NULL) {
-      // Error occurred, trust that SimpleNew set the error state
-      return Status::OK();
-    }
+  Status Convert(int nthreads, PyObject** output) {
+    column_types_.resize(table_->num_columns());
+    column_block_placement_.resize(table_->num_columns());
+    type_counts_.clear();
+    blocks_.clear();
 
-    set_numpy_metadata(type, col_->type().get(), out_);
+    RETURN_NOT_OK(CreateBlocks());
+    RETURN_NOT_OK(WriteTableToBlocks(nthreads));
 
-    return Status::OK();
+    return GetResultList(output);
   }
 
-  template <int TYPE>
-  Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr<Array> arr) {
-    typedef typename arrow_traits<TYPE>::T T;
-
-    auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get());
-    auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data());
+  Status CreateBlocks() {
+    for (int i = 0; i < table_->num_columns(); ++i) {
+      std::shared_ptr<Column> col = table_->column(i);
+      PandasBlock::type output_type;
 
-    // Zero-Copy. We can pass the data pointer directly to NumPy.
-    void* data = const_cast<T*>(in_values);
+      Type::type column_type = col->type()->type;
+      switch (column_type) {
+        case Type::BOOL:
+          output_type = col->null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL;
+          break;
+        case Type::UINT8:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT8;
+          break;
+        case Type::INT8:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT8;
+          break;
+        case Type::UINT16:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT16;
+          break;
+        case Type::INT16:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT16;
+          break;
+        case Type::UINT32:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT32;
+          break;
+        case Type::INT32:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT32;
+          break;
+        case Type::INT64:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT64;
+          break;
+        case Type::UINT64:
+          output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT64;
+          break;
+        case Type::FLOAT:
+          output_type = PandasBlock::FLOAT;
+          break;
+        case Type::DOUBLE:
+          output_type = PandasBlock::DOUBLE;
+          break;
+        case Type::STRING:
+        case Type::BINARY:
+          output_type = PandasBlock::OBJECT;
+          break;
+        case Type::DATE:
+          output_type = PandasBlock::DATETIME;
+          break;
+        case Type::TIMESTAMP:
+          output_type = PandasBlock::DATETIME;
+          break;
+        case Type::LIST: {
+          auto list_type = std::static_pointer_cast<ListType>(col->type());
+          if (!ListTypeSupported(list_type->value_type()->type)) {
+            std::stringstream ss;
+            ss << "Not implemented type for lists: "
+               << list_type->value_type()->ToString();
+            return Status::NotImplemented(ss.str());
+          }
+          output_type = PandasBlock::OBJECT;
+        } break;
+        case Type::DICTIONARY:
+          output_type = PandasBlock::CATEGORICAL;
+          break;
+        default:
+          return Status::NotImplemented(col->type()->ToString());
+      }
 
-    PyAcquireGIL lock;
+      int block_placement = 0;
+      if (column_type == Type::DICTIONARY) {
+        std::shared_ptr<PandasBlock> block;
+        RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block));
+        categorical_blocks_[i] = block;
+      } else {
+        auto it = type_counts_.find(output_type);
+        if (it != type_counts_.end()) {
+          block_placement = it->second;
+          // Increment count
+          it->second += 1;
+        } else {
+          // Add key to map
+          type_counts_[output_type] = 1;
+        }
+      }
 
-    // Zero-Copy. We can pass the data pointer directly to NumPy.
-    npy_intp dims[1] = {col_->length()};
-    out_ = reinterpret_cast<PyArrayObject*>(
-        PyArray_SimpleNewFromData(1, dims, npy_type, data));
+      column_types_[i] = output_type;
+      column_block_placement_[i] = block_placement;
+    }
 
-    if (out_ == NULL) {
-      // Error occurred, trust that SimpleNew set the error state
-      return Status::OK();
+    // Create normal non-categorical blocks
+    for (const auto& it : type_counts_) {
+      PandasBlock::type type = static_cast<PandasBlock::type>(it.first);
+      std::shared_ptr<PandasBlock> block;
+      RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block));
+      blocks_[type] = block;
     }
+    return Status::OK();
+  }
 
-    set_numpy_metadata(npy_type, col_->type().get(), out_);
+  Status WriteTableToBlocks(int nthreads) {
+    auto WriteColumn = [this](int i) {
+      std::shared_ptr<Column> col = this->table_->column(i);
+      PandasBlock::type output_type = this->column_types_[i];
 
-    if (PyArray_SetBaseObject(out_, py_ref_) == -1) {
-      // Error occurred, trust that SetBaseObject set the error state
-      return Status::OK();
-    } else {
-      // PyArray_SetBaseObject steals our reference to py_ref_
-      Py_INCREF(py_ref_);
-    }
+      int rel_placement = this->column_block_placement_[i];
 
-    // Arrow data is immutable.
-    PyArray_CLEARFLAGS(out_, NPY_ARRAY_WRITEABLE);
+      std::shared_ptr<PandasBlock> block;
+      if (output_type == PandasBlock::CATEGORICAL) {
+        auto it = this->categorical_blocks_.find(i);
+        if (it == this->blocks_.end()) {
+          return Status::KeyError("No categorical block allocated");
+        }
+        block = it->second;
+      } else {
+        auto it = this->blocks_.find(output_type);
+        if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); }
+        block = it->second;
+      }
+      return block->Write(col, i, rel_placement);
+    };
 
-    return Status::OK();
-  }
+    nthreads = std::min<int>(nthreads, table_->num_columns());
 
-  // ----------------------------------------------------------------------
-  // Allocate new array and deserialize. Can do a zero copy conversion for some
-  // types
+    if (nthreads == 1) {
+      for (int i = 0; i < table_->num_columns(); ++i) {
+        RETURN_NOT_OK(WriteColumn(i));
+      }
+    } else {
+      std::vector<std::thread> thread_pool;
+      thread_pool.reserve(nthreads);
+      std::atomic<int> task_counter(0);
 
-  Status Convert(PyObject** out) {
-#define CONVERT_CASE(TYPE)                             \
-  case arrow::Type::TYPE: {                            \
-    RETURN_NOT_OK(ConvertValues<arrow::Type::TYPE>()); \
-  } break;
+      std::mutex error_mtx;
+      bool error_occurred = false;
+      Status error;
 
-    switch (col_->type()->type) {
-      CONVERT_CASE(BOOL);
-      CONVERT_CASE(INT8);
-      CONVERT_CASE(INT16);
-      CONVERT_CASE(INT32);
-      CONVERT_CASE(INT64);
-      CONVERT_CASE(UINT8);
-      CONVERT_CASE(UINT16);
-      CONVERT_CASE(UINT32);
-      CONVERT_CASE(UINT64);
-      CONVERT_CASE(FLOAT);
-      CONVERT_CASE(DOUBLE);
-      CONVERT_CASE(BINARY);
-      CONVERT_CASE(STRING);
-      CONVERT_CASE(DATE);
-      CONVERT_CASE(TIMESTAMP);
-      default: {
-        std::stringstream ss;
-        ss << "Arrow type reading not implemented for " << col_->type()->ToString();
-        return Status::NotImplemented(ss.str());
+      for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+        thread_pool.emplace_back(
+            [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() {
+              int column_num;
+              while (!error_occurred) {
+                column_num = task_counter.fetch_add(1);
+                if (column_num >= this->table_->num_columns()) { break; }
+                Status s = WriteColumn(column_num);
+                if (!s.ok()) {
+                  std::lock_guard<std::mutex> lock(error_mtx);
+                  error_occurred = true;
+                  error = s;
+                  break;
+                }
+              }
+            });
+      }
+      for (auto&& thread : thread_pool) {
+        thread.join();
       }
-    }
-
-#undef CONVERT_CASE
 
-    *out = reinterpret_cast<PyObject*>(out_);
+      if (error_occurred) { return error; }
+    }
     return Status::OK();
   }
 
-  template <int TYPE>
-  inline typename std::enable_if<
-      (TYPE != arrow::Type::DATE) & arrow_traits<TYPE>::is_numeric_nullable, Status>::type
-  ConvertValues() {
-    typedef typename arrow_traits<TYPE>::T T;
-    int npy_type = arrow_traits<TYPE>::npy_type;
+  Status GetResultList(PyObject** out) {
+    PyAcquireGIL lock;
 
-    if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) {
-      return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0));
+    auto num_blocks =
+        static_cast<Py_ssize_t>(blocks_.size() + categorical_blocks_.size());
+    PyObject* result = PyList_New(num_blocks);
+    RETURN_IF_PYERROR();
+
+    int i = 0;
+    for (const auto& it : blocks_) {
+      const std::shared_ptr<PandasBlock> block = it.second;
+      PyObject* item;
+      RETURN_NOT_OK(block->GetPyResult(&item));
+      if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); }
     }
 
-    RETURN_NOT_OK(AllocateOutput(npy_type));
-    auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_));
-    ConvertNumericNullable<T>(data_, arrow_traits<TYPE>::na_value, out_values);
+    for (const auto& it : categorical_blocks_) {
+      const std::shared_ptr<PandasBlock> block = it.second;
+      PyObject* item;
+      RETURN_NOT_OK(block->GetPyResult(&item));
+      if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); }
+    }
 
+    *out = result;
     return Status::OK();
   }
 
-  template <int TYPE>
-  inline typename std::enable_if<TYPE == arrow::Type::DATE, Status>::type
-  ConvertValues() {
-    typedef typename arrow_traits<TYPE>::T T;
+ private:
+  std::shared_ptr<Table> table_;
 
-    RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
-    auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_));
-    ConvertDates<T>(data_, arrow_traits<TYPE>::na_value, out_values);
-    return Status::OK();
-  }
+  // column num -> block type id
+  std::vector<PandasBlock::type> column_types_;
 
-  // Integer specialization
-  template <int TYPE>
-  inline
-      typename std::enable_if<arrow_traits<TYPE>::is_numeric_not_nullable, Status>::type
-      ConvertValues() {
-    typedef typename arrow_traits<TYPE>::T T;
-    int npy_type = arrow_traits<TYPE>::npy_type;
+  // column num -> relative placement within internal block
+  std::vector<int> column_block_placement_;
 
-    if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) {
-      return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0));
-    }
+  // block type -> type count
+  std::unordered_map<int, int> type_counts_;
 
-    if (data_.null_count() > 0) {
-      RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64));
-      auto out_values = reinterpret_cast<double*>(PyArray_DATA(out_));
-      ConvertIntegerWithNulls<T>(data_, out_values);
-    } else {
-      RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
-      auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_));
-      ConvertIntegerNoNullsSameType<T>(data_, out_values);
-    }
+  // block type -> block
+  std::unordered_map<int, std::shared_ptr<PandasBlock>> blocks_;
 
-    return Status::OK();
-  }
+  // column number -> categorical block
+  std::unordered_map<int, std::shared_ptr<PandasBlock>> categorical_blocks_;
+};
 
-  // Boolean specialization
-  template <int TYPE>
-  inline typename std::enable_if<arrow_traits<TYPE>::is_boolean, Status>::type
-  ConvertValues() {
-    if (data_.null_count() > 0) {
-      RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
-      auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-      RETURN_NOT_OK(ConvertBooleanWithNulls(data_, out_values));
-    } else {
-      RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type));
-      auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_));
-      ConvertBooleanNoNulls(data_, out_values);
-    }
-    return Status::OK();
+Status ConvertTableToPandas(
+    const std::shared_ptr<Table>& table, int nthreads, PyObject** out) {
+  DataFrameBlockCreator helper(table);
+  return helper.Convert(nthreads, out);
+}
+
+// ----------------------------------------------------------------------
+// Serialization
+
+template <int TYPE>
+class ArrowSerializer {
+ public:
+  ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask)
+      : pool_(pool), arr_(arr), mask_(mask) {
+    length_ = PyArray_SIZE(arr_);
   }
 
-  // UTF8 strings
-  template <int TYPE>
-  inline typename std::enable_if<TYPE == arrow::Type::STRING, Status>::type
-  ConvertValues() {
-    RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
-    auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-    return ConvertBinaryLike<arrow::StringArray>(data_, out_values);
+  void IndicateType(const std::shared_ptr<Field> field) { field_indicator_ = field; }
+
+  Status Convert(std::shared_ptr<Array>* out);
+
+  int stride() const { return PyArray_STRIDES(arr_)[0]; }
+
+  Status InitNullBitmap() {
+    int null_bytes = BitUtil::BytesForBits(length_);
+
+    null_bitmap_ = std::make_shared<arrow::PoolBuffer>(pool_);
+    RETURN_NOT_OK(null_bitmap_->Resize(null_bytes));
+
+    null_bitmap_data_ = null_bitmap_->mutable_data();
+    memset(null_bitmap_data_, 0, null_bytes);
+
+    return Status::OK();
   }
 
-  template <int T2>
-  inline typename std::enable_if<T2 == arrow::Type::BINARY, Status>::type
-  ConvertValues() {
-    RETURN_NOT_OK(AllocateOutput(NPY_OBJECT));
-    auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_));
-    return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values);
+  bool is_strided() const {
+    npy_intp* astrides = PyArray_STRIDES(arr_);
+    return astrides[0] != PyArray_DESCR(arr_)->elsize;
   }
 
  private:
-  std::shared_ptr<Column> col_;
-  const arrow::ChunkedArray& data_;
-  PyObject* py_ref_;
-  PyArrayObject* out_;
-};
+  Status ConvertData();
 
-Status ConvertArrayToPandas(
-    const std::shared_ptr<Array>& arr, PyObject* py_ref, PyObject** out) {
-  static std::string dummy_name = "dummy";
-  auto field = std::make_shared<Field>(dummy_name, arr->type());
-  auto col = std::make_shared<Column>(field, arr);
-  return ConvertColumnToPandas(col, py_ref, out);
-}
+  Status ConvertDates(std::shared_ptr<Array>* out) {
+    PyAcquireGIL lock;
 
-Status ConvertColumnToPandas(
-    const std::shared_ptr<Column>& col, PyObject* py_ref, PyObject** out) {
-  ArrowDeserializer converter(col, py_ref);
-  return converter.Convert(out);
-}
+    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
+    arrow::TypePtr string_type(new arrow::DateType());
+    arrow::DateBuilder date_builder(pool_, string_type);
+    RETURN_NOT_OK(date_builder.Resize(length_));
 
-// ----------------------------------------------------------------------
-// pandas 0.x DataFrame conversion internals
+    Status s;
+    PyObject* obj;
+    for (int64_t i = 0; i < length_; ++i) {
+      obj = objects[i];
+      if (PyDate_CheckExact(obj)) {
+        PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj);
+        date_builder.Append(PyDate_to_ms(pydate));
+      } else {
+        date_builder.AppendNull();
+      }
+    }
+    return date_builder.Finish(out);
+  }
 
-class PandasBlock {
- public:
-  enum type {
-    OBJECT,
-    UINT8,
-    INT8,
-    UINT16,
-    INT16,
-    UINT32,
-    INT32,
-    UINT64,
-    INT64,
-    FLOAT,
-    DOUBLE,
-    BOOL,
-    DATETIME,
-    CATEGORICAL
-  };
+  Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
+    PyAcquireGIL lock;
 
-  PandasBlock(int64_t num_rows, int num_columns)
-      : num_rows_(num_rows), num_columns_(num_columns) {}
-  virtual ~PandasBlock() {}
+    // The output type at this point is inconclusive because there may be bytes
+    // and unicode mixed in the object array
 
-  virtual Status Allocate() = 0;
-  virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-      int64_t rel_placement) = 0;
+    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
+    arrow::TypePtr string_type(new arrow::StringType());
+    arrow::StringBuilder string_builder(pool_, string_type);
+    RETURN_NOT_OK(string_builder.Resize(length_));
 
-  PyObject* block_arr() { return block_arr_.obj(); }
+    Status s;
+    bool have_bytes = false;
+    RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes));
+    RETURN_NOT_OK(string_builder.Finish(out));
 
-  PyObject* placement_arr() { return placement_arr_.obj(); }
+    if (have_bytes) {
+      const auto& arr = static_cast<const arrow::StringArray&>(*out->get());
+      *out = std::make_shared<arrow::BinaryArray>(
+          arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap());
+    }
+    return Status::OK();
+  }
 
- protected:
-  Status AllocateNDArray(int npy_type) {
+  Status ConvertBooleans(std::shared_ptr<Array>* out) {
     PyAcquireGIL lock;
 
-    npy_intp block_dims[2] = {num_columns_, num_rows_};
-    PyObject* block_arr = PyArray_SimpleNew(2, block_dims, npy_type);
-    if (block_arr == NULL) {
-      // TODO(wesm): propagating Python exception
-      return Status::OK();
+    PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
+
+    int nbytes = BitUtil::BytesForBits(length_);
+    auto data = std::make_shared<arrow::PoolBuffer>(pool_);
+    RETURN_NOT_OK(data->Resize(nbytes));
+    uint8_t* bitmap = data->mutable_data();
+    memset(bitmap, 0, nbytes);
+
+    int64_t null_count = 0;
+    for (int64_t i = 0; i < length_; ++i) {
+      if (objects[i] == Py_True) {
+        BitUtil::SetBit(bitmap, i);
+        BitUtil::SetBit(null_bitmap_data_, i);
+      } else if (objects[i] != Py_False) {
+        ++null_count;
+      } else {
+        BitUtil::SetBit(null_bitmap_data_, i);
+      }
     }
 
-    npy_intp placement_dims[1] = {num_columns_};
-    PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);
-    if (placement_arr == NULL) {
-      // TODO(wesm): propagating Python exception
-      return Status::OK();
+    *out = std::make_shared<arrow::BooleanArray>(length_, data, null_count, null_bitmap_);
+
+    return Status::OK();
+  }
+
+  template <int ITEM_TYPE, typename ArrowType>
+  Status ConvertTypedLists(
+      const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out);
+
+#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType)                            \
+  case Type::TYPE: {                                                      \
+    return ConvertTypedLists<NUMPY_TYPE, ::arrow::ArrowType>(field, out); \
+  }
+
+  Status ConvertLists(const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
+    switch (field->type->type) {
+      LIST_CASE(UINT8, NPY_UINT8, UInt8Type)
+      LIST_CASE(INT8, NPY_INT8, Int8Type)
+      LIST_CASE(UINT16, NPY_UINT16, UInt16Type)
+      LIST_CASE(INT16, NPY_INT16, Int16Type)
+      LIST_CASE(UINT32, NPY_UINT32, UInt32Type)
+      LIST_CASE(INT32, NPY_INT32, Int32Type)
+      LIST_CASE(UINT64, NPY_UINT64, UInt64Type)
+      LIST_CASE(INT64, NPY_INT64, Int64Type)
+      LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType)
+      LIST_CASE(FLOAT, NPY_FLOAT, FloatType)
+      LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType)
+      LIST_CASE(STRING, NPY_OBJECT, StringType)
+      default:
+        return Status::TypeError("Unknown list item type");
     }
 
-    block_arr_.reset(block_arr);
-    placement_arr_.reset(placement_arr);
-
-    block_data_ = reinterpret_cast<uint8_t*>(
-        PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr)));
-
-    placement_data_ = reinterpret_cast<int64_t*>(
-        PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));
-
-    return Status::OK();
+    return Status::TypeError("Unknown list type");
   }
 
-  int64_t num_rows_;
-  int num_columns_;
-
-  OwnedRef block_arr_;
-  uint8_t* block_data_;
+  Status MakeDataType(std::shared_ptr<DataType>* out);
 
-  // ndarray<int32>
-  OwnedRef placement_arr_;
-  int64_t* placement_data_;
+  arrow::MemoryPool* pool_;
 
-  DISALLOW_COPY_AND_ASSIGN(PandasBlock);
-};
+  PyArrayObject* arr_;
+  PyArrayObject* mask_;
 
-#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum)                         \
-  case Type::ArrowEnum:                                                     \
-    RETURN_NOT_OK((ConvertListsLike<::arrow::ArrowType>(col, out_buffer))); \
-    break;
+  int64_t length_;
 
-class ObjectBlock : public PandasBlock {
- public:
-  using PandasBlock::PandasBlock;
-  virtual ~ObjectBlock() {}
+  std::shared_ptr<Field> field_indicator_;
+  std::shared_ptr<arrow::Buffer> data_;
+  std::shared_ptr<arrow::ResizableBuffer> null_bitmap_;
+  uint8_t* null_bitmap_data_;
+};
 
-  Status Allocate() override { return AllocateNDArray(NPY_OBJECT); }
+// Returns null count
+static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) {
+  int64_t null_count = 0;
+  const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask));
+  // TODO(wesm): strided null mask
+  for (int i = 0; i < length; ++i) {
+    if (mask_values[i]) {
+      ++null_count;
+    } else {
+      BitUtil::SetBit(bitmap, i);
+    }
+  }
+  return null_count;
+}
 
-  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-      int64_t rel_placement) override {
-    Type::type type = col->type()->type;
+template <int TYPE>
+inline Status ArrowSerializer<TYPE>::MakeDataType(std::shared_ptr<DataType>* out) {
+  out->reset(new typename npy_traits<TYPE>::TypeClass());
+  return Status::OK();
+}
 
-    PyObject** out_buffer =
-        reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_;
+template <>
+inline Status ArrowSerializer<NPY_DATETIME>::MakeDataType(
+    std::shared_ptr<DataType>* out) {
+  PyArray_Descr* descr = PyArray_DESCR(arr_);
+  auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata);
+  arrow::TimestampType::Unit unit;
 
-    const ChunkedArray& data = *col->data().get();
+  switch (date_dtype->meta.base) {
+    case NPY_FR_s:
+      unit = arrow::TimestampType::Unit::SECOND;
+      break;
+    case NPY_FR_ms:
+      unit = arrow::TimestampType::Unit::MILLI;
+      break;
+    case NPY_FR_us:
+      unit = arrow::TimestampType::Unit::MICRO;
+      break;
+    case NPY_FR_ns:
+      unit = arrow::TimestampType::Unit::NANO;
+      break;
+    default:
+      return Status::Invalid("Unknown NumPy datetime unit");
+  }
 
-    if (type == Type::BOOL) {
-      RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer));
-    } else if (type == Type::BINARY) {
-      RETURN_NOT_OK(ConvertBinaryLike<arrow::BinaryArray>(data, out_buffer));
-    } else if (type == Type::STRING) {
-      RETURN_NOT_OK(ConvertBinaryLike<arrow::StringArray>(data, out_buffer));
-    } else if (type == Type::LIST) {
-      auto list_type = std::static_pointer_cast<ListType>(col->type());
-      switch (list_type->value_type()->type) {
-        CONVERTLISTSLIKE_CASE(UInt8Type, UINT8)
-        CONVERTLISTSLIKE_CASE(Int8Type, INT8)
-        CONVERTLISTSLIKE_CASE(UInt16Type, UINT16)
-        CONVERTLISTSLIKE_CASE(Int16Type, INT16)
-        CONVERTLISTSLIKE_CASE(UInt32Type, UINT32)
-        CONVERTLISTSLIKE_CASE(Int32Type, INT32)
-        CONVERTLISTSLIKE_CASE(UInt64Type, UINT64)
-        CONVERTLISTSLIKE_CASE(Int64Type, INT64)
-        CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP)
-        CONVERTLISTSLIKE_CASE(FloatType, FLOAT)
-        CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE)
-        CONVERTLISTSLIKE_CASE(StringType, STRING)
-        default: {
-          std::stringstream ss;
-          ss << "Not implemented type for lists: " << list_type->value_type()->ToString();
-          return Status::NotImplemented(ss.str());
-        }
-      }
-    } else {
-      std::stringstream ss;
-      ss << "Unsupported type for object array output: " << col->type()->ToString();
-      return Status::NotImplemented(ss.str());
-    }
+  out->reset(new arrow::TimestampType(unit));
+  return Status::OK();
+}
 
-    placement_data_[rel_placement] = abs_placement;
-    return Status::OK();
-  }
-};
+template <int TYPE>
+inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) {
+  typedef npy_traits<TYPE> traits;
 
-template <int ARROW_TYPE, typename C_TYPE>
-class IntBlock : public PandasBlock {
- public:
-  using PandasBlock::PandasBlock;
+  if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); }
 
-  Status Allocate() override {
-    return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type);
+  int64_t null_count = 0;
+  if (mask_ != nullptr) {
+    null_count = MaskToBitmap(mask_, length_, null_bitmap_data_);
+  } else if (traits::supports_nulls) {
+    null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_);
   }
 
-  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-      int64_t rel_placement) override {
-    Type::type type = col->type()->type;
-
-    C_TYPE* out_buffer =
-        reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_;
+  RETURN_NOT_OK(ConvertData());
+  std::shared_ptr<DataType> type;
+  RETURN_NOT_OK(MakeDataType(&type));
+  RETURN_NOT_OK(MakePrimitiveArray(type, length_, data_, null_count, null_bitmap_, out));
+  return Status::OK();
+}
 
-    const ChunkedArray& data = *col->data().get();
+template <>
+inline Status ArrowSerializer<NPY_OBJECT>::Convert(std::shared_ptr<Array>* out) {
+  // Python object arrays are annoying, since we could have one of:
+  //
+  // * Strings
+  // * Booleans with nulls
+  // * Mixed type (not supported at the moment by arrow format)
+  //
+  // Additionally, nulls may be encoded either as np.nan or None. So we have to
+  // do some type inference and conversion
 
-    if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); }
+  RETURN_NOT_OK(InitNullBitmap());
 
-    ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer);
-    placement_data_[rel_placement] = abs_placement;
-    return Status::OK();
+  // TODO: mask not supported here
+  const PyObject** objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_));
+  {
+    PyAcquireGIL lock;
+    PyDateTime_IMPORT;
   }
-};
-
-using UInt8Block = IntBlock<Type::UINT8, uint8_t>;
-using Int8Block = IntBlock<Type::INT8, int8_t>;
-using UInt16Block = IntBlock<Type::UINT16, uint16_t>;
-using Int16Block = IntBlock<Type::INT16, int16_t>;
-using UInt32Block = IntBlock<Type::UINT32, uint32_t>;
-using Int32Block = IntBlock<Type::INT32, int32_t>;
-using UInt64Block = IntBlock<Type::UINT64, uint64_t>;
-using Int64Block = IntBlock<Type::INT64, int64_t>;
 
-class Float32Block : public PandasBlock {
- public:
-  using PandasBlock::PandasBlock;
+  if (field_indicator_) {
+    switch (field_indicator_->type->type) {
+      case Type::STRING:
+        return ConvertObjectStrings(out);
+      case Type::BOOL:
+        return ConvertBooleans(out);
+      case Type::DATE:
+        return ConvertDates(out);
+      case Type::LIST: {
+        auto list_field = static_cast<ListType*>(field_indicator_->type.get());
+        return ConvertLists(list_field->value_field(), out);
+      }
+      default:
+        return Status::TypeError("No known conversion to Arrow type");
+    }
+  } else {
+    for (int64_t i = 0; i < length_; ++i) {
+      if (PyObject_is_null(objects[i])) {
+        continue;
+      } else if (PyObject_is_string(objects[i])) {
+        return ConvertObjectStrings(out);
+      } else if (PyBool_Check(objects[i])) {
+        return ConvertBooleans(out);
+      } else if (PyDate_CheckExact(objects[i])) {
+        return ConvertDates(out);
+      } else {
+        return Status::TypeError("unhandled python type");
+      }
+    }
+  }
 
-  Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); }
+  return Status::TypeError("Unable to infer type of object array, were all null");
+}
 
-  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-      int64_t rel_placement) override {
-    Type::type type = col->type()->type;
+template <int TYPE>
+inline Status ArrowSerializer<TYPE>::ConvertData() {
+  // TODO(wesm): strided arrays
+  if (is_strided()) { return Status::Invalid("no support for strided data yet"); }
 
-    if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); }
+  data_ = std::make_shared<NumPyBuffer>(arr_);
+  return Status::OK();
+}
 
-    float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_;
+template <>
+inline Status ArrowSerializer<NPY_BOOL>::ConvertData() {
+  if (is_strided()) { return Status::Invalid("no support for strided data yet"); }
 
-    ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer);
-    placement_data_[rel_placement] = abs_placement;
-    return Status::OK();
-  }
-};
+  int nbytes = BitUtil::BytesForBits(length_);
+  auto buffer = std::make_shared<arrow::PoolBuffer>(pool_);
+  RETURN_NOT_OK(buffer->Resize(nbytes));
 
-class Float64Block : public PandasBlock {
- public:
-  using PandasBlock::PandasBlock;
+  const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
 
-  Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); }
+  uint8_t* bitmap = buffer->mutable_data();
 
-  Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
-      int64_t rel_placement) override {
-    Type::type type = col->type()->type;
+  memset(bitmap, 0, nbytes);
+  for (int i = 0; i < length_; ++i) {
+    if (values[i] > 0) { BitUtil::SetBit(bitmap, i); }
+  }
 
-    double* out_buffer =
-        reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_;
+  data_ = buffer;
 
-    const ChunkedArray& data = *col->data().get();
+  return Status::OK();
+}
 
-#define INTEGER_CASE(IN_TYPE)                         \
-  ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \
-  break;
+template <int TYPE>
+template <int ITEM_TYPE, typename ArrowType>
+inline Status ArrowSerializer<TYPE>::ConvertTypedLists(
+    const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) {
+  typedef npy_traits<ITEM_TYPE> traits;
+  typedef typename traits::value_type T;
+  typedef typename traits::BuilderClass BuilderT;
 
-    switch (type) {
-      case Type::UINT8:
-        INTEGER_CASE(uint8_t);
-      case Type::INT8:
-        INTEGER_CASE(int8_t);
-      case Type::UINT16:
-        INTEGER_CASE(uint16_t);
-      case Type::INT16:
-        INTEGER_CASE(int16_t);
-      case Type::UINT32:
-        INTEGER_CASE(uint32_t);
-      case Type::INT32:
-        INTEGER_CASE(int32_t);
-      case Type::UINT64:
-        INTEGER_CASE(uint64_t);
-      case Type::INT64:
-        INTEGER_CASE(int64_t);
-      case Type::FLOAT:
-        ConvertNumericNullableCast<float, double>(data, NAN, out_buffer);
-        break;
-      case Type::DOUBLE:
-        ConvertNumericNullable<double>(data, NAN, out_buffer);
-        break;
-      default:
-        return Status::NotImplemented(col->type()->ToString());
-    }
+  auto value_builder = std::make_shared<BuilderT>(pool_, field->type);
+  ListBuilder list_builder(pool_, value_builder);
+  PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
+  for (int64_t i = 0; i < length_; ++i) {
+    if (PyObject_is_null(objects[i])) {
+      RETURN_NOT_OK(list_builder.AppendNull());
+    } else if (PyArray_Check(objects[i])) {
+      auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]);
+      RETURN_NOT_OK(list_builder.Append(true));
 
-#undef INTEGER_CASE
+      // TODO(uwe): Support more complex numpy array structures
+      RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE));
 
-    placement_data_[rel_placement] = abs_placement;
-    return Status::OK();
+      int32_t size = PyArray_DIM(numpy_array, 0);
+      auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_arra

<TRUNCATED>