You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/28 12:49:15 UTC
arrow git commit: ARROW-428: [Python] Multithreaded conversion from
Arrow table to pandas.DataFrame
Repository: arrow
Updated Branches:
refs/heads/master 1079a3206 -> ab5f66a2e
ARROW-428: [Python] Multithreaded conversion from Arrow table to pandas.DataFrame
This yields a substantial speedup on my laptop. On a 1GB numeric dataset, with 1 thread (the default prior to this patch):
```
>>> %timeit df2 = table.to_pandas(nthreads=1)
1 loop, best of 3: 498 ms per loop
```
With 4 threads (this is a true quad-core machine)
```
>>> %timeit df2 = table.to_pandas(nthreads=4)
1 loop, best of 3: 151 ms per loop
```
The default number of cores used is the `os.cpu_count` divided by 2 (since hyperthreading doesn't help with this largely memory-bound operation).
Author: Wes McKinney <we...@twosigma.com>
Closes #252 from wesm/ARROW-428 and squashes the following commits:
da929bf [Wes McKinney] Factor out common compiler flag code between Arrow C++ and Python CMake files. Add pyarrow.cpu_count/set_cpu_count functions per feedback
cad89e9 [Wes McKinney] Tweak pyarrow cmake flags
e70f16d [Wes McKinney] Add missing GIL acquisition. Do not spawn too many threads if few columns
bc4dff7 [Wes McKinney] Return errors from threaded conversion. Add doc about number of cpus used
79f5fd9 [Wes McKinney] Implement multithreaded conversion from Arrow table to pandas.DataFrame. Default to multiprocessing.cpu_count for now
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/ab5f66a2
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/ab5f66a2
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/ab5f66a2
Branch: refs/heads/master
Commit: ab5f66a2e9a2b6af312ffdfa2f95c65b1d6f5739
Parents: 1079a32
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed Dec 28 07:49:06 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed Dec 28 07:49:06 2016 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 71 +------------
cpp/cmake_modules/SetupCxxFlags.cmake | 86 ++++++++++++++++
python/CMakeLists.txt | 36 +------
python/pyarrow/__init__.py | 1 +
python/pyarrow/config.pyx | 23 +++++
python/pyarrow/table.pyx | 38 +++----
python/pyarrow/tests/test_convert_pandas.py | 42 ++++++--
python/src/pyarrow/adapters/pandas.cc | 121 ++++++++++++++++-------
8 files changed, 250 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 93e9853..4507e67 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -105,76 +105,7 @@ endif()
# Compiler flags
############################################################
-# Check if the target architecture and compiler supports some special
-# instruction sets that would boost performance.
-include(CheckCXXCompilerFlag)
-# x86/amd64 compiler flags
-CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3)
-# power compiler flags
-CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC)
-
-# compiler flags that are common across debug/release builds
-# - Wall: Enable all warnings.
-set(CXX_COMMON_FLAGS "-std=c++11 -Wall")
-
-# Only enable additional instruction sets if they are supported
-if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3)
- set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3")
-endif()
-if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC)
- set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec")
-endif()
-
-if (APPLE)
- # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be
- # the default standard library which does not support C++11. libc++ is the
- # default from 10.9 onward.
- set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++")
-endif()
-
-# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
-# For all builds:
-# For CMAKE_BUILD_TYPE=Debug
-# -ggdb: Enable gdb debugging
-# For CMAKE_BUILD_TYPE=FastDebug
-# Same as DEBUG, except with some optimizations on.
-# For CMAKE_BUILD_TYPE=Release
-# -O3: Enable all compiler optimizations
-# -g: Enable symbols for profiler tools (TODO: remove for shipping)
-if (NOT MSVC)
- set(CXX_FLAGS_DEBUG "-ggdb -O0")
- set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
- set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
-endif()
-
-set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate")
-set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use")
-
-# if no build build type is specified, default to debug builds
-if (NOT CMAKE_BUILD_TYPE)
- set(CMAKE_BUILD_TYPE Debug)
-endif(NOT CMAKE_BUILD_TYPE)
-
-string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
-
-
-# Set compile flags based on the build type.
-message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
-if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}")
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}")
-else()
- message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
-endif ()
-
-message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")
+include(SetupCxxFlags)
# Add common flags
set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/cpp/cmake_modules/SetupCxxFlags.cmake
----------------------------------------------------------------------
diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
new file mode 100644
index 0000000..ee672bd
--- /dev/null
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Check if the target architecture and compiler supports some special
+# instruction sets that would boost performance.
+include(CheckCXXCompilerFlag)
+# x86/amd64 compiler flags
+CHECK_CXX_COMPILER_FLAG("-msse3" CXX_SUPPORTS_SSE3)
+# power compiler flags
+CHECK_CXX_COMPILER_FLAG("-maltivec" CXX_SUPPORTS_ALTIVEC)
+
+# compiler flags that are common across debug/release builds
+# - Wall: Enable all warnings.
+set(CXX_COMMON_FLAGS "-std=c++11 -Wall")
+
+# Only enable additional instruction sets if they are supported
+if (CXX_SUPPORTS_SSE3 AND ARROW_SSE3)
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -msse3")
+endif()
+if (CXX_SUPPORTS_ALTIVEC AND ARROW_ALTIVEC)
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -maltivec")
+endif()
+
+if (APPLE)
+ # Depending on the default OSX_DEPLOYMENT_TARGET (< 10.9), libstdc++ may be
+ # the default standard library which does not support C++11. libc++ is the
+ # default from 10.9 onward.
+ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -stdlib=libc++")
+endif()
+
+# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
+# For all builds:
+# For CMAKE_BUILD_TYPE=Debug
+# -ggdb: Enable gdb debugging
+# For CMAKE_BUILD_TYPE=FastDebug
+# Same as DEBUG, except with some optimizations on.
+# For CMAKE_BUILD_TYPE=Release
+# -O3: Enable all compiler optimizations
+# -g: Enable symbols for profiler tools (TODO: remove for shipping)
+if (NOT MSVC)
+ set(CXX_FLAGS_DEBUG "-ggdb -O0")
+ set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
+ set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
+endif()
+
+set(CXX_FLAGS_PROFILE_GEN "${CXX_FLAGS_RELEASE} -fprofile-generate")
+set(CXX_FLAGS_PROFILE_BUILD "${CXX_FLAGS_RELEASE} -fprofile-use")
+
+# if no build build type is specified, default to debug builds
+if (NOT CMAKE_BUILD_TYPE)
+ set(CMAKE_BUILD_TYPE Debug)
+endif(NOT CMAKE_BUILD_TYPE)
+
+string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
+
+# Set compile flags based on the build type.
+message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
+if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_GEN")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_GEN}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "PROFILE_BUILD")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_PROFILE_BUILD}")
+else()
+ message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
+endif ()
+
+message(STATUS "Build Type: ${CMAKE_BUILD_TYPE}")
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 6ad55f8..6c24772 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -65,41 +65,7 @@ endif(CCACHE_FOUND)
# Compiler flags
############################################################
-# compiler flags that are common across debug/release builds
-set(CXX_COMMON_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall")
-
-# compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
-# For all builds:
-# For CMAKE_BUILD_TYPE=Debug
-# -ggdb: Enable gdb debugging
-# For CMAKE_BUILD_TYPE=FastDebug
-# Same as DEBUG, except with some optimizations on.
-# For CMAKE_BUILD_TYPE=Release
-# -O3: Enable all compiler optimizations
-# -g: Enable symbols for profiler tools (TODO: remove for shipping)
-# -DNDEBUG: Turn off dchecks/asserts/debug only code.
-set(CXX_FLAGS_DEBUG "-ggdb -O0")
-set(CXX_FLAGS_FASTDEBUG "-ggdb -O1")
-set(CXX_FLAGS_RELEASE "-O3 -g -DNDEBUG")
-
-# if no build build type is specified, default to debug builds
-if (NOT CMAKE_BUILD_TYPE)
- set(CMAKE_BUILD_TYPE Debug)
-endif(NOT CMAKE_BUILD_TYPE)
-
-string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
-
-# Set compile flags based on the build type.
-message("Configured for ${CMAKE_BUILD_TYPE} build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})")
-if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG})
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG})
-elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE})
-else()
- message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
-endif ()
+include(SetupCxxFlags)
# Add common flags
set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 9ede934..6f81ef4 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -26,6 +26,7 @@ except DistributionNotFound:
import pyarrow.config
+from pyarrow.config import cpu_count, set_cpu_count
from pyarrow.array import (Array,
from_pandas_series, from_pylist,
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/config.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/config.pyx b/python/pyarrow/config.pyx
index 778c15a..aa30f09 100644
--- a/python/pyarrow/config.pyx
+++ b/python/pyarrow/config.pyx
@@ -29,3 +29,26 @@ pyarrow_init()
import numpy as np
pyarrow_set_numpy_nan(np.nan)
+
+import multiprocessing
+import os
+cdef int CPU_COUNT = int(
+ os.environ.get('OMP_NUM_THREADS',
+ max(multiprocessing.cpu_count() // 2, 1)))
+
+def cpu_count():
+ """
+ Returns
+ -------
+ count : Number of CPUs to use by default in parallel operations. Default is
+ max(1, multiprocessing.cpu_count() / 2), but can be overridden by the
+ OMP_NUM_THREADS environment variable. For the default, we divide the CPU
+ count by 2 because most modern computers have hyperthreading turned on,
+ so doubling the CPU count beyond the number of physical cores does not
+ help.
+ """
+ return CPU_COUNT
+
+def set_cpu_count(count):
+ global CPU_COUNT
+ CPU_COUNT = max(int(count), 1)
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 9375557..20137e3 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -439,7 +439,9 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads):
from pandas.core.internals import BlockManager, make_block
from pandas import RangeIndex
- check_status(pyarrow.ConvertTableToPandas(table, nthreads, &result_obj))
+ with nogil:
+ check_status(pyarrow.ConvertTableToPandas(table, nthreads,
+ &result_obj))
result = PyObject_to_object(result_obj)
@@ -610,36 +612,28 @@ cdef class Table:
table.init(c_table)
return table
- def to_pandas(self, nthreads=1, block_based=True):
+ def to_pandas(self, nthreads=None):
"""
Convert the arrow::Table to a pandas DataFrame
+ Parameters
+ ----------
+ nthreads : int, default max(1, multiprocessing.cpu_count() / 2)
+ For the default, we divide the CPU count by 2 because most modern
+ computers have hyperthreading turned on, so doubling the CPU count
+ beyond the number of physical cores does not help
+
Returns
-------
pandas.DataFrame
"""
- cdef:
- PyObject* arr
- shared_ptr[CColumn] col
- Column column
-
import pandas as pd
- if block_based:
- mgr = table_to_blockmanager(self.sp_table, nthreads)
- return pd.DataFrame(mgr)
- else:
- names = []
- data = []
- for i in range(self.table.num_columns()):
- col = self.table.column(i)
- column = self.column(i)
- check_status(pyarrow.ConvertColumnToPandas(
- col, <PyObject*> column, &arr))
- names.append(frombytes(col.get().name()))
- data.append(PyObject_to_object(arr))
-
- return pd.DataFrame(dict(zip(names, data)), columns=names)
+ if nthreads is None:
+ nthreads = pyarrow.config.cpu_count()
+
+ mgr = table_to_blockmanager(self.sp_table, nthreads)
+ return pd.DataFrame(mgr)
@property
def name(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index da34f85..863aa30 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -27,6 +27,29 @@ from pyarrow.compat import u
import pyarrow as A
+def _alltypes_example(size=100):
+ return pd.DataFrame({
+ 'uint8': np.arange(size, dtype=np.uint8),
+ 'uint16': np.arange(size, dtype=np.uint16),
+ 'uint32': np.arange(size, dtype=np.uint32),
+ 'uint64': np.arange(size, dtype=np.uint64),
+ 'int8': np.arange(size, dtype=np.int16),
+ 'int16': np.arange(size, dtype=np.int16),
+ 'int32': np.arange(size, dtype=np.int32),
+ 'int64': np.arange(size, dtype=np.int64),
+ 'float32': np.arange(size, dtype=np.float32),
+ 'float64': np.arange(size, dtype=np.float64),
+ 'bool': np.random.randn(size) > 0,
+ # TODO(wesm): Pandas only support ns resolution, Arrow supports s, ms,
+ # us, ns
+ 'datetime': np.arange("2016-01-01T00:00:00.001", size,
+ dtype='datetime64[ms]'),
+ 'str': [str(x) for x in range(size)],
+ 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
+ 'empty_str': [''] * size
+ })
+
+
class TestPandasConversion(unittest.TestCase):
def setUp(self):
@@ -35,10 +58,10 @@ class TestPandasConversion(unittest.TestCase):
def tearDown(self):
pass
- def _check_pandas_roundtrip(self, df, expected=None,
+ def _check_pandas_roundtrip(self, df, expected=None, nthreads=1,
timestamps_to_ms=False):
table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms)
- result = table.to_pandas()
+ result = table.to_pandas(nthreads=nthreads)
if expected is None:
expected = df
tm.assert_frame_equal(result, expected)
@@ -217,18 +240,21 @@ class TestPandasConversion(unittest.TestCase):
def test_date(self):
df = pd.DataFrame({
- 'date': [
- datetime.date(2000, 1, 1),
- None,
- datetime.date(1970, 1, 1),
- datetime.date(2040, 2, 26)
- ]})
+ 'date': [datetime.date(2000, 1, 1),
+ None,
+ datetime.date(1970, 1, 1),
+ datetime.date(2040, 2, 26)]})
table = A.from_pandas_dataframe(df)
result = table.to_pandas()
expected = df.copy()
expected['date'] = pd.to_datetime(df['date'])
tm.assert_frame_equal(result, expected)
+ def test_threaded_conversion(self):
+ df = _alltypes_example()
+ self._check_pandas_roundtrip(df, nthreads=2,
+ timestamps_to_ms=False)
+
# def test_category(self):
# repeats = 1000
# values = [b'foo', None, u'bar', 'qux', np.nan]
http://git-wip-us.apache.org/repos/asf/arrow/blob/ab5f66a2/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index 899eb55..5e5826b 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -19,15 +19,18 @@
#include <Python.h>
-#include "pyarrow/numpy_interop.h"
-
#include "pyarrow/adapters/pandas.h"
+#include "pyarrow/numpy_interop.h"
+#include <algorithm>
+#include <atomic>
#include <cmath>
#include <cstdint>
#include <memory>
+#include <mutex>
#include <sstream>
#include <string>
+#include <thread>
#include <unordered_map>
#include "arrow/api.h"
@@ -1031,7 +1034,8 @@ class PandasBlock {
: num_rows_(num_rows), num_columns_(num_columns) {}
virtual Status Allocate() = 0;
- virtual Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) = 0;
+ virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) = 0;
PyObject* block_arr() { return block_arr_.obj(); }
@@ -1057,7 +1061,6 @@ class PandasBlock {
block_arr_.reset(block_arr);
placement_arr_.reset(placement_arr);
- current_placement_index_ = 0;
block_data_ = reinterpret_cast<uint8_t*>(
PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr)));
@@ -1070,7 +1073,6 @@ class PandasBlock {
int64_t num_rows_;
int num_columns_;
- int current_placement_index_;
OwnedRef block_arr_;
uint8_t* block_data_;
@@ -1088,11 +1090,12 @@ class ObjectBlock : public PandasBlock {
Status Allocate() override { return AllocateNDArray(NPY_OBJECT); }
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
PyObject** out_buffer =
- reinterpret_cast<PyObject**>(block_data_) + current_placement_index_ * num_rows_;
+ reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_;
const ChunkedArray& data = *col->data().get();
@@ -1108,7 +1111,7 @@ class ObjectBlock : public PandasBlock {
return Status::NotImplemented(ss.str());
}
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1122,18 +1125,19 @@ class IntBlock : public PandasBlock {
return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type);
}
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
C_TYPE* out_buffer =
- reinterpret_cast<C_TYPE*>(block_data_) + current_placement_index_ * num_rows_;
+ reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_;
const ChunkedArray& data = *col->data().get();
if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); }
ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer);
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1153,16 +1157,16 @@ class Float32Block : public PandasBlock {
Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); }
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); }
- float* out_buffer =
- reinterpret_cast<float*>(block_data_) + current_placement_index_ * num_rows_;
+ float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_;
ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer);
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1173,11 +1177,12 @@ class Float64Block : public PandasBlock {
Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); }
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
double* out_buffer =
- reinterpret_cast<double*>(block_data_) + current_placement_index_ * num_rows_;
+ reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_;
const ChunkedArray& data = *col->data().get();
@@ -1214,7 +1219,7 @@ class Float64Block : public PandasBlock {
#undef INTEGER_CASE
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1225,16 +1230,17 @@ class BoolBlock : public PandasBlock {
Status Allocate() override { return AllocateNDArray(NPY_BOOL); }
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); }
uint8_t* out_buffer =
- reinterpret_cast<uint8_t*>(block_data_) + current_placement_index_ * num_rows_;
+ reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_;
ConvertBooleanNoNulls(*col->data().get(), out_buffer);
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1253,11 +1259,12 @@ class DatetimeBlock : public PandasBlock {
return Status::OK();
}
- Status WriteNext(const std::shared_ptr<Column>& col, int64_t placement) override {
+ Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement,
+ int64_t rel_placement) override {
Type::type type = col->type()->type;
int64_t* out_buffer =
- reinterpret_cast<int64_t*>(block_data_) + current_placement_index_ * num_rows_;
+ reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_;
const ChunkedArray& data = *col.get()->data();
@@ -1283,7 +1290,7 @@ class DatetimeBlock : public PandasBlock {
return Status::NotImplemented(col->type()->ToString());
}
- placement_data_[current_placement_index_++] = placement;
+ placement_data_[rel_placement] = abs_placement;
return Status::OK();
}
};
@@ -1333,6 +1340,7 @@ class DataFrameBlockCreator {
Status Convert(int nthreads, PyObject** output) {
column_types_.resize(table_->num_columns());
+ column_block_placement_.resize(table_->num_columns());
type_counts_.clear();
blocks_.clear();
@@ -1397,7 +1405,9 @@ class DataFrameBlockCreator {
}
auto it = type_counts_.find(output_type);
+ int block_placement = 0;
if (it != type_counts_.end()) {
+ block_placement = it->second;
// Increment count
it->second += 1;
} else {
@@ -1406,6 +1416,7 @@ class DataFrameBlockCreator {
}
column_types_[i] = output_type;
+ column_block_placement_[i] = block_placement;
}
return Status::OK();
}
@@ -1421,22 +1432,61 @@ class DataFrameBlockCreator {
}
Status WriteTableToBlocks(int nthreads) {
- if (nthreads > 1) {
- return Status::NotImplemented("multithreading not yet implemented");
- }
+ auto WriteColumn = [this](int i) {
+ std::shared_ptr<Column> col = this->table_->column(i);
+ PandasBlock::type output_type = this->column_types_[i];
- for (int i = 0; i < table_->num_columns(); ++i) {
- std::shared_ptr<Column> col = table_->column(i);
- PandasBlock::type output_type = column_types_[i];
+ int rel_placement = this->column_block_placement_[i];
+
+ auto it = this->blocks_.find(output_type);
+ if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); }
+ return it->second->Write(col, i, rel_placement);
+ };
- auto it = blocks_.find(output_type);
- if (it == blocks_.end()) { return Status::KeyError("No block allocated"); }
- RETURN_NOT_OK(it->second->WriteNext(col, i));
+ nthreads = std::min<int>(nthreads, table_->num_columns());
+
+ if (nthreads == 1) {
+ for (int i = 0; i < table_->num_columns(); ++i) {
+ RETURN_NOT_OK(WriteColumn(i));
+ }
+ } else {
+ std::vector<std::thread> thread_pool;
+ thread_pool.reserve(nthreads);
+ std::atomic<int> task_counter(0);
+
+ std::mutex error_mtx;
+ bool error_occurred = false;
+ Status error;
+
+ for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+ thread_pool.emplace_back(
+ [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() {
+ int column_num;
+ while (!error_occurred) {
+ column_num = task_counter.fetch_add(1);
+ if (column_num >= this->table_->num_columns()) { break; }
+ Status s = WriteColumn(column_num);
+ if (!s.ok()) {
+ std::lock_guard<std::mutex> lock(error_mtx);
+ error_occurred = true;
+ error = s;
+ break;
+ }
+ }
+ });
+ }
+ for (auto&& thread : thread_pool) {
+ thread.join();
+ }
+
+ if (error_occurred) { return error; }
}
return Status::OK();
}
Status GetResultList(PyObject** out) {
+ PyAcquireGIL lock;
+
auto num_blocks = static_cast<Py_ssize_t>(blocks_.size());
PyObject* result = PyList_New(num_blocks);
RETURN_IF_PYERROR();
@@ -1463,8 +1513,13 @@ class DataFrameBlockCreator {
private:
std::shared_ptr<Table> table_;
+
+ // column num -> block type id
std::vector<PandasBlock::type> column_types_;
+ // column num -> relative placement within internal block
+ std::vector<int> column_block_placement_;
+
// block type -> type count
std::unordered_map<int, int> type_counts_;