You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2020/05/13 01:15:29 UTC

[arrow] 13/17: ARROW-8641: [C++][Python] Sort included indices in IpcReader - Respect column selection in FeatherReader

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-0.17.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 057cbe4d304f8706c8d08d955027e38abc0d7783
Author: Joris Van den Bossche <jo...@gmail.com>
AuthorDate: Thu May 7 11:23:25 2020 -0500

    ARROW-8641: [C++][Python] Sort included indices in IpcReader - Respect column selection in FeatherReader
    
    Closes #7122 from jorisvandenbossche/ARROW-8641-feather-order
    
    Authored-by: Joris Van den Bossche <jo...@gmail.com>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/ipc/read_write_test.cc | 18 +++++++++
 cpp/src/arrow/ipc/reader.cc          |  6 ++-
 python/pyarrow/feather.pxi           |  4 ++
 python/pyarrow/feather.py            | 29 ++++++++++----
 python/pyarrow/tests/test_dataset.py |  6 +++
 python/pyarrow/tests/test_feather.py | 74 ++++++++++++++++++++++++++++++++++--
 6 files changed, 125 insertions(+), 12 deletions(-)

diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc
index 1b5073e..bc4e080 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -1043,6 +1043,24 @@ class ReaderWriterMixin {
       AssertBatchesEqual(*ex_batch, *out_batches[0], /*check_metadata=*/true);
     }
 
+    // Duplicated or unordered indices are normalized when reading
+    options.included_fields = {3, 1, 1};
+
+    {
+      WriterHelper writer_helper;
+      BatchVector out_batches;
+      std::shared_ptr<Schema> out_schema;
+      ASSERT_OK(RoundTripHelper(writer_helper, {batch}, IpcWriteOptions::Defaults(),
+                                options, &out_batches, &out_schema));
+
+      auto ex_schema = schema({field("a1", utf8()), field("a3", utf8())},
+                              key_value_metadata({"key1"}, {"value1"}));
+      AssertSchemaEqual(*ex_schema, *out_schema);
+
+      auto ex_batch = RecordBatch::Make(ex_schema, a0->length(), {a1, a3});
+      AssertBatchesEqual(*ex_batch, *out_batches[0], /*check_metadata=*/true);
+    }
+
     // Out of bounds cases
     options.included_fields = {1, 3, 5};
     {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 95b1c5a..95c5cb5 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -17,6 +17,7 @@
 
 #include "arrow/ipc/reader.h"
 
+#include <algorithm>
 #include <climits>
 #include <cstdint>
 #include <cstring>
@@ -528,8 +529,11 @@ Status GetInclusionMaskAndOutSchema(const std::shared_ptr<Schema>& full_schema,
 
   inclusion_mask->resize(full_schema->num_fields(), false);
 
+  auto included_indices_sorted = included_indices;
+  std::sort(included_indices_sorted.begin(), included_indices_sorted.end());
+
   FieldVector included_fields;
-  for (int i : included_indices) {
+  for (int i : included_indices_sorted) {
     // Ignore out of bounds indices
     if (i < 0 || i >= full_schema->num_fields()) {
       return Status::Invalid("Out of bounds field index: ", i);
diff --git a/python/pyarrow/feather.pxi b/python/pyarrow/feather.pxi
index 96cb741..4da5435 100644
--- a/python/pyarrow/feather.pxi
+++ b/python/pyarrow/feather.pxi
@@ -66,6 +66,10 @@ cdef class FeatherReader:
         with nogil:
             self.reader = GetResultValue(CFeatherReader.Open(reader))
 
+    @property
+    def version(self):
+        return self.reader.get().version()
+
     def read(self):
         cdef shared_ptr[CTable] sp_table
         with nogil:
diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py
index 6d909ef..a599e15 100644
--- a/python/pyarrow/feather.py
+++ b/python/pyarrow/feather.py
@@ -20,7 +20,7 @@ import os
 
 from pyarrow.pandas_compat import _pandas_api  # noqa
 from pyarrow.lib import FeatherError  # noqa
-from pyarrow.lib import Table, concat_tables
+from pyarrow.lib import Table, concat_tables, schema
 import pyarrow.lib as ext
 
 
@@ -234,11 +234,24 @@ def read_table(source, columns=None, memory_map=True):
 
     column_types = [type(column) for column in columns]
     if all(map(lambda t: t == int, column_types)):
-        return reader.read_indices(columns)
+        table = reader.read_indices(columns)
     elif all(map(lambda t: t == str, column_types)):
-        return reader.read_names(columns)
-
-    column_type_names = [t.__name__ for t in column_types]
-    raise TypeError("Columns must be indices or names. "
-                    "Got columns {} of types {}"
-                    .format(columns, column_type_names))
+        table = reader.read_names(columns)
+    else:
+        column_type_names = [t.__name__ for t in column_types]
+        raise TypeError("Columns must be indices or names. "
+                        "Got columns {} of types {}"
+                        .format(columns, column_type_names))
+
+    # Feather v1 already respects the column selection
+    if reader.version < 3:
+        return table
+    # Feather v2 reads with sorted / deduplicated selection
+    elif sorted(set(columns)) == columns:
+        return table
+    else:
+        # follow exact order / selection of names
+        new_fields = [table.schema.field(c) for c in columns]
+        new_schema = schema(new_fields, metadata=table.schema.metadata)
+        new_columns = [table.column(c) for c in columns]
+        return Table.from_arrays(new_columns, schema=new_schema)
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index d67a6b5..efaa4b0 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -1485,6 +1485,12 @@ def test_feather_format(tempdir):
     result = dataset.to_table()
     assert result.equals(table)
 
+    # ARROW-8641 - column selection order
+    result = dataset.to_table(columns=["b", "a"])
+    assert result.column_names == ["b", "a"]
+    result = dataset.to_table(columns=["a", "a"])
+    assert result.column_names == ["a", "a"]
+
     # error with Feather v1 files
     write_feather(table, str(basedir / "data1.feather"), version=1)
     with pytest.raises(ValueError):
diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py
index 779649b..fbfcade 100644
--- a/python/pyarrow/tests/test_feather.py
+++ b/python/pyarrow/tests/test_feather.py
@@ -87,7 +87,20 @@ def _check_pandas_roundtrip(df, expected=None, path=None,
     assert_frame_equal(result, expected)
 
 
-def _assert_error_on_write(df, exc, path=None):
+def _check_arrow_roundtrip(table, path=None):
+    if path is None:
+        path = random_path()
+
+    TEST_FILES.append(path)
+    write_feather(table, path)
+    if not os.path.exists(path):
+        raise Exception('file not written')
+
+    result = read_table(path)
+    assert result.equals(table)
+
+
+def _assert_error_on_write(df, exc, path=None, version=2):
     # check that we are raising the exception
     # on writing
 
@@ -97,7 +110,7 @@ def _assert_error_on_write(df, exc, path=None):
     TEST_FILES.append(path)
 
     def f():
-        write_feather(df, path)
+        write_feather(df, path, version=version)
 
     pytest.raises(exc, f)
 
@@ -535,7 +548,7 @@ def test_sparse_dataframe(version):
 
 
 @pytest.mark.pandas
-def test_duplicate_columns():
+def test_duplicate_columns_pandas():
 
     # https://github.com/wesm/feather/issues/53
     # not currently able to handle duplicate columns
@@ -544,6 +557,13 @@ def test_duplicate_columns():
     _assert_error_on_write(df, ValueError)
 
 
+def test_duplicate_columns():
+    # only works for version 2
+    table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'a', 'b'])
+    _check_arrow_roundtrip(table)
+    _assert_error_on_write(table, ValueError, version=1)
+
+
 @pytest.mark.pandas
 def test_unsupported():
     # https://github.com/wesm/feather/issues/240
@@ -665,3 +685,51 @@ def test_feather_without_pandas(tempdir, version):
     write_feather(table, str(tempdir / "data.feather"), version=version)
     result = read_table(str(tempdir / "data.feather"))
     assert result.equals(table)
+
+
+@pytest.mark.pandas
+def test_read_column_selection(version):
+    # ARROW-8641
+    df = pd.DataFrame(np.arange(12).reshape(4, 3), columns=['a', 'b', 'c'])
+
+    # select columns as string names or integer indices
+    _check_pandas_roundtrip(
+        df, columns=['a', 'c'], expected=df[['a', 'c']], version=version)
+    _check_pandas_roundtrip(
+        df, columns=[0, 2], expected=df[['a', 'c']], version=version)
+
+    # different order is followed
+    _check_pandas_roundtrip(
+        df, columns=['b', 'a'], expected=df[['b', 'a']], version=version)
+    _check_pandas_roundtrip(
+        df, columns=[1, 0], expected=df[['b', 'a']], version=version)
+
+
+def test_read_column_duplicated_selection(tempdir, version):
+    # duplicated columns in the column selection
+    table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'b', 'c'])
+    path = str(tempdir / "data.feather")
+    write_feather(table, path, version=version)
+
+    for col_selection in [['a', 'b', 'a'], [0, 1, 0]]:
+        result = read_table(path, columns=col_selection)
+        assert result.column_names == ['a', 'b', 'a']
+
+
+def test_read_column_duplicated_in_file(tempdir):
+    # duplicated columns in feather file (only works for feather v2)
+    table = pa.table([[1, 2, 3], [4, 5, 6], [7, 8, 9]], names=['a', 'b', 'a'])
+    path = str(tempdir / "data.feather")
+    write_feather(table, path, version=2)
+
+    # no selection works fine
+    result = read_table(path)
+    assert result.equals(table)
+
+    # selection with indices works
+    result = read_table(path, columns=[0, 2])
+    assert result.column_names == ['a', 'a']
+
+    # selection with column names errors
+    with pytest.raises(ValueError):
+        read_table(path, columns=['a', 'b'])