You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/05/13 01:15:29 UTC
[arrow] 13/17: ARROW-8641: [C++][Python] Sort included indices in
IpcReader - Respect column selection in FeatherReader
This is an automated email from the ASF dual-hosted git repository.
kszucs pushed a commit to branch maint-0.17.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 057cbe4d304f8706c8d08d955027e38abc0d7783
Author: Joris Van den Bossche <jo...@gmail.com>
AuthorDate: Thu May 7 11:23:25 2020 -0500
ARROW-8641: [C++][Python] Sort included indices in IpcReader - Respect column selection in FeatherReader
Closes #7122 from jorisvandenbossche/ARROW-8641-feather-order
Authored-by: Joris Van den Bossche <jo...@gmail.com>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/ipc/read_write_test.cc | 18 +++++++++
cpp/src/arrow/ipc/reader.cc | 6 ++-
python/pyarrow/feather.pxi | 4 ++
python/pyarrow/feather.py | 29 ++++++++++----
python/pyarrow/tests/test_dataset.py | 6 +++
python/pyarrow/tests/test_feather.py | 74 ++++++++++++++++++++++++++++++++++--
6 files changed, 125 insertions(+), 12 deletions(-)
diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 1b5073e..bc4e080 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -1043,6 +1043,24 @@ class ReaderWriterMixin {
AssertBatchesEqual(*ex_batch, *out_batches[0], /*check_metadata=*/true);
}
+ // Duplicated or unordered indices are normalized when reading
+ options.included_fields = {3, 1, 1};
+
+ {
+ WriterHelper writer_helper;
+ BatchVector out_batches;
+ std::shared_ptr<Schema> out_schema;
+ ASSERT_OK(RoundTripHelper(writer_helper, {batch}, IpcWriteOptions::Defaults(),
+ options, &out_batches, &out_schema));
+
+ auto ex_schema = schema({field("a1", utf8()), field("a3", utf8())},
+ key_value_metadata({"key1"}, {"value1"}));
+ AssertSchemaEqual(*ex_schema, *out_schema);
+
+ auto ex_batch = RecordBatch::Make(ex_schema, a0->length(), {a1, a3});
+ AssertBatchesEqual(*ex_batch, *out_batches[0], /*check_metadata=*/true);
+ }
+
// Out of bounds cases
options.included_fields = {1, 3, 5};
{
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 95b1c5a..95c5cb5 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -17,6 +17,7 @@
#include "arrow/ipc/reader.h"
+#include <algorithm>
#include <climits>
#include <cstdint>
#include <cstring>
@@ -528,8 +529,11 @@ Status GetInclusionMaskAndOutSchema(const std::shared_ptr<Schema>& full_schema,
inclusion_mask->resize(full_schema->num_fields(), false);
+ auto included_indices_sorted = included_indices;
+ std::sort(included_indices_sorted.begin(), included_indices_sorted.end());
+
FieldVector included_fields;
- for (int i : included_indices) {
+ for (int i : included_indices_sorted) {
// Ignore out of bounds indices
if (i < 0 || i >= full_schema->num_fields()) {
return Status::Invalid("Out of bounds field index: ", i);
diff --git a/python/pyarrow/feather.pxi b/python/pyarrow/feather.pxi
index 96cb741..4da5435 100644
--- a/python/pyarrow/feather.pxi
+++ b/python/pyarrow/feather.pxi
@@ -66,6 +66,10 @@ cdef class FeatherReader:
with nogil:
self.reader = GetResultValue(CFeatherReader.Open(reader))
+ @property
+ def version(self):
+ return self.reader.get().version()
+
def read(self):
cdef shared_ptr[CTable] sp_table
with nogil:
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 6d909ef..a599e15 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -20,7 +20,7 @@ import os
from pyarrow.pandas_compat import _pandas_api # noqa
from pyarrow.lib import FeatherError # noqa
-from pyarrow.lib import Table, concat_tables
+from pyarrow.lib import Table, concat_tables, schema
import pyarrow.lib as ext
@@ -234,11 +234,24 @@ def read_table(source, columns=None, memory_map=True):
column_types = [type(column) for column in columns]
if all(map(lambda t: t == int, column_types)):
- return reader.read_indices(columns)
+ table = reader.read_indices(columns)
elif all(map(lambda t: t == str, column_types)):
- return reader.read_names(columns)
-
- column_type_names = [t.__name__ for t in column_types]
- raise TypeError("Columns must be indices or names. "
- "Got columns {} of types {}"
- .format(columns, column_type_names))
+ table = reader.read_names(columns)
+ else:
+ column_type_names = [t.__name__ for t in column_types]
+ raise TypeError("Columns must be indices or names. "
+ "Got columns {} of types {}"
+ .format(columns, column_type_names))
+
+ # Feather v1 already respects the column selection
+ if reader.version < 3:
+ return table
+ # Feather v2 reads with sorted / deduplicated selection
+ elif sorted(set(columns)) == columns:
+ return table
+ else:
+ # follow exact order / selection of names
+ new_fields = [table.schema.field(c) for c in columns]
+ new_schema = schema(new_fields, metadata=table.schema.metadata)
+ new_columns = [table.column(c) for c in columns]
+ return Table.from_arrays(new_columns, schema=new_schema)
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index d67a6b5..efaa4b0 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1485,6 +1485,12 @@ def test_feather_format(tempdir):
result = dataset.to_table()
assert result.equals(table)
+ # ARROW-8641 - column selection order
+ result = dataset.to_table(columns=["b", "a"])
+ assert result.column_names == ["b", "a"]
+ result = dataset.to_table(columns=["a", "a"])
+ assert result.column_names == ["a", "a"]
+
# error with Feather v1 files
write_feather(table, str(basedir / "data1.feather"), version=1)
with pytest.raises(ValueError):
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index 779649b..fbfcade 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -87,7 +87,20 @@ def _check_pandas_roundtrip(df, expected=None, path=None,
assert_frame_equal(result, expected)
-def _assert_error_on_write(df, exc, path=None):
+def _check_arrow_roundtrip(table, path=None):
+ if path is None:
+ path = random_path()
+
+ TEST_FILES.append(path)
+ write_feather(table, path)
+ if not os.path.exists(path):
+ raise Exception('file not written')
+
+ result = read_table(path)
+ assert result.equals(table)
+
+
+def _assert_error_on_write(df, exc, path=None, version=2):
# check that we are raising the exception
# on writing
@@ -97,7 +110,7 @@ def _assert_error_on_write(df, exc, path=None):
TEST_FILES.append(path)
def f():
- write_feather(df, path)
+ write_feather(df, path, version=version)
pytest.raises(exc, f)
@@ -535,7 +548,7 @@ def test_sparse_dataframe(version):
@pytest.mark.pandas
-def test_duplicate_columns():
+def test_duplicate_columns_pandas():
# https://github.com/wesm/feather/issues/53
# not currently able to handle duplicate columns
@@ -544,6 +557,13 @@ def test_duplicate_columns():
_assert_error_on_write(df, ValueError)
+def test_duplicate_columns():
+ # only works for version 2
+ table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'a', 'b'])
+ _check_arrow_roundtrip(table)
+ _assert_error_on_write(table, ValueError, version=1)
+
+
@pytest.mark.pandas
def test_unsupported():
# https://github.com/wesm/feather/issues/240
@@ -665,3 +685,51 @@ def test_feather_without_pandas(tempdir, version):
write_feather(table, str(tempdir / "data.feather"), version=version)
result = read_table(str(tempdir / "data.feather"))
assert result.equals(table)
+
+
+@pytest.mark.pandas
+def test_read_column_selection(version):
+ # ARROW-8641
+ df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=['a', 'b', 'c'])
+
+ # select columns as string names or integer indices
+ _check_pandas_roundtrip(
+ df, columns=['a', 'c'], expected=df[['a', 'c']], version=version)
+ _check_pandas_roundtrip(
+ df, columns=[0, 2], expected=df[['a', 'c']], version=version)
+
+ # different order is followed
+ _check_pandas_roundtrip(
+ df, columns=['b', 'a'], expected=df[['b', 'a']], version=version)
+ _check_pandas_roundtrip(
+ df, columns=[1, 0], expected=df[['b', 'a']], version=version)
+
+
+def test_read_column_duplicated_selection(tempdir, version):
+ # duplicated columns in the column selection
+ table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'b', 'c'])
+ path = str(tempdir / "data.feather")
+ write_feather(table, path, version=version)
+
+ for col_selection in [['a', 'b', 'a'], [0, 1, 0]]:
+ result = read_table(path, columns=col_selection)
+ assert result.column_names == ['a', 'b', 'a']
+
+
+def test_read_column_duplicated_in_file(tempdir):
+ # duplicated columns in feather file (only works for feather v2)
+ table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'b', 'a'])
+ path = str(tempdir / "data.feather")
+ write_feather(table, path, version=2)
+
+ # no selection works fine
+ result = read_table(path)
+ assert result.equals(table)
+
+ # selection with indices works
+ result = read_table(path, columns=[0, 2])
+ assert result.column_names == ['a', 'a']
+
+ # selection with column names errors
+ with pytest.raises(ValueError):
+ read_table(path, columns=['a', 'b'])