You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "westonpace (via GitHub)" <gi...@apache.org> on 2023/06/23 20:13:27 UTC

[GitHub] [arrow] westonpace commented on a diff in pull request #35568: GH-33986: [Python] Add a minimal protocol for datasets

westonpace commented on code in PR #35568:
URL: https://github.com/apache/arrow/pull/35568#discussion_r1240320002


##########
python/pyarrow/dataset/protocol.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Protocol definitions for pyarrow.dataset
+
+These provide the abstract interface for a dataset. Other libraries may implement
+this interface to expose their data, without having to extend PyArrow's classes.
+
+Applications and libraries that want to consume datasets should accept datasets
+that implement these protocols, rather than requiring the specific
+PyArrow classes.
+
+See Extending PyArrow Datasets for more information:
+
+https://arrow.apache.org/docs/python/integration/dataset.html
+"""
+from abc import abstractmethod, abstractproperty
+from typing import Iterator, List, Optional
+
+# TODO: remove once we drop support for Python 3.7
+if sys.version_info >= (3, 8):
+    from typing import Protocol, runtime_checkable
+else:
+    from typing_extensions import Protocol, runtime_checkable
+
+from pyarrow.dataset import Expression
+from pyarrow import Table, RecordBatchReader, Schema
+
+
+@runtime_checkable
+class Scanner(Protocol):
+    """
+    A scanner implementation for a dataset.
+
+    This may be a scan of a whole dataset, or a scan of a single fragment.
+    """
+    @abstractmethod
+    def count_rows(self) -> int:
+        """
+        Count the number of rows in this dataset.
+
+        Implementors may provide optimized code paths that compute this from metadata.
+
+        Returns
+        -------
+        int
+            The number of rows in the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def head(self, num_rows: int) -> Table:
+        """
+        Get the first ``num_rows`` rows of the dataset.
+
+        Parameters
+        ----------
+        num_rows : int
+            The number of rows to return.
+
+        Returns
+        -------
+        Table
+            A table containing the first ``num_rows`` rows of the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def to_reader(self) -> RecordBatchReader:
+        """
+        Create a Record Batch Reader for this scan.
+
+        This is used to read the data in chunks.
+
+        Returns
+        -------
+        RecordBatchReader
+        """
+        ...
+
+
+@runtime_checkable
+class Scannable(Protocol):
+    @abstractmethod
+    def scanner(self, columns: Optional[List[str]] = None,
+                filter: Optional[Expression] = None, batch_size: Optional[int] = None,
+                use_threads: bool = True,
+                **kwargs) -> Scanner:
+        """Create a scanner for this dataset.
+
+        Parameters
+        ----------
+        columns : List[str], optional
+            Names of columns to include in the scan. If None, all columns are
+            included.
+        filter : Expression, optional
+            Filter expression to apply to the scan. If None, no filter is applied.
+        batch_size : int, optional
+            The number of rows to include in each batch. If None, the default
+            value is used. The default value is implementation specific.
+        use_threads : bool, default True
+            Whether to use multiple threads to read the rows. It is expected
+            that consumers reading a whole dataset in one scanner will keep this
+            as True, while consumers reading a single fragment per worker will
+            typically set this to False.

Review Comment:
   Maybe instead of `It is expected` we could use softer wording like "Often" or "Typically"?  I just think there are corner cases for each.  For example, you might not want to use threads because you expect to run multiple queries concurrently or you just want to cut down on the resource usage.



##########
python/pyarrow/dataset/protocol.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Protocol definitions for pyarrow.dataset
+
+These provide the abstract interface for a dataset. Other libraries may implement
+this interface to expose their data, without having to extend PyArrow's classes.
+
+Applications and libraries that want to consume datasets should accept datasets
+that implement these protocols, rather than requiring the specific
+PyArrow classes.
+
+See Extending PyArrow Datasets for more information:
+
+https://arrow.apache.org/docs/python/integration/dataset.html
+"""
+from abc import abstractmethod, abstractproperty
+from typing import Iterator, List, Optional
+
+# TODO: remove once we drop support for Python 3.7
+if sys.version_info >= (3, 8):
+    from typing import Protocol, runtime_checkable
+else:
+    from typing_extensions import Protocol, runtime_checkable
+
+from pyarrow.dataset import Expression
+from pyarrow import Table, RecordBatchReader, Schema
+
+
+@runtime_checkable
+class Scanner(Protocol):
+    """
+    A scanner implementation for a dataset.
+
+    This may be a scan of a whole dataset, or a scan of a single fragment.
+    """
+    @abstractmethod
+    def count_rows(self) -> int:
+        """
+        Count the number of rows in this dataset.
+
+        Implementors may provide optimized code paths that compute this from metadata.
+
+        Returns
+        -------
+        int
+            The number of rows in the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def head(self, num_rows: int) -> Table:
+        """
+        Get the first ``num_rows`` rows of the dataset.
+
+        Parameters
+        ----------
+        num_rows : int
+            The number of rows to return.
+
+        Returns
+        -------
+        Table
+            A table containing the first ``num_rows`` rows of the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def to_reader(self) -> RecordBatchReader:
+        """
+        Create a Record Batch Reader for this scan.
+
+        This is used to read the data in chunks.
+
+        Returns
+        -------
+        RecordBatchReader
+        """
+        ...
+
+
+@runtime_checkable
+class Scannable(Protocol):
+    @abstractmethod
+    def scanner(self, columns: Optional[List[str]] = None,
+                filter: Optional[Expression] = None, batch_size: Optional[int] = None,
+                use_threads: bool = True,
+                **kwargs) -> Scanner:
+        """Create a scanner for this dataset.
+
+        Parameters
+        ----------
+        columns : List[str], optional
+            Names of columns to include in the scan. If None, all columns are
+            included.
+        filter : Expression, optional
+            Filter expression to apply to the scan. If None, no filter is applied.
+        batch_size : int, optional
+            The number of rows to include in each batch. If None, the default
+            value is used. The default value is implementation specific.
+        use_threads : bool, default True
+            Whether to use multiple threads to read the rows. It is expected
+            that consumers reading a whole dataset in one scanner will keep this
+            as True, while consumers reading a single fragment per worker will
+            typically set this to False.
+
+        Notes
+        -----
+        The filters must be fully satisfied. If the dataset cannot satisfy the
+        filter, it should raise an error.
+
+        Only the following expressions are allowed in the filter:
+        - Equality / inequalities (==, !=, <, >, <=, >=)
+        - Conjunctions (and, or)
+        - Field references (e.g. "a" or "a.b.c")
+        - Literals (e.g. 1, 1.0, "a", True)
+        - cast
+        - is_null / not_null
+        - isin
+        - between
+        - negation (not)
+
+        """
+        ...
+
+
+@runtime_checkable
+class Fragment(Scannable, Protocol):

Review Comment:
   There are a some things I would like to have here, as a user, but I understand we are just getting started and trying to be minimal.  So take these as suggestions:
   
   `__repr__` <-- converting a fragment to string is very useful for debugging
   
   `estimated_cost` <-- I get why this one isn't there but a fragment might be 5 rows or it might be 5 million rows, and that could be valuable for figuring out how to distribute a dataset workload.  Still, there is no universal way of estimating cost, so perhaps we can leave this for an extension.



##########
python/pyarrow/dataset/protocol.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Protocol definitions for pyarrow.dataset
+
+These provide the abstract interface for a dataset. Other libraries may implement
+this interface to expose their data, without having to extend PyArrow's classes.
+
+Applications and libraries that want to consume datasets should accept datasets
+that implement these protocols, rather than requiring the specific
+PyArrow classes.
+
+See Extending PyArrow Datasets for more information:
+
+https://arrow.apache.org/docs/python/integration/dataset.html
+"""
+from abc import abstractmethod, abstractproperty
+from typing import Iterator, List, Optional
+
+# TODO: remove once we drop support for Python 3.7
+if sys.version_info >= (3, 8):
+    from typing import Protocol, runtime_checkable
+else:
+    from typing_extensions import Protocol, runtime_checkable
+
+from pyarrow.dataset import Expression
+from pyarrow import Table, RecordBatchReader, Schema
+
+
+@runtime_checkable
+class Scanner(Protocol):
+    """
+    A scanner implementation for a dataset.
+
+    This may be a scan of a whole dataset, or a scan of a single fragment.
+    """
+    @abstractmethod
+    def count_rows(self) -> int:
+        """
+        Count the number of rows in this dataset.
+
+        Implementors may provide optimized code paths that compute this from metadata.
+
+        Returns
+        -------
+        int
+            The number of rows in the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def head(self, num_rows: int) -> Table:
+        """
+        Get the first ``num_rows`` rows of the dataset.
+
+        Parameters
+        ----------
+        num_rows : int
+            The number of rows to return.
+
+        Returns
+        -------
+        Table
+            A table containing the first ``num_rows`` rows of the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def to_reader(self) -> RecordBatchReader:
+        """
+        Create a Record Batch Reader for this scan.
+
+        This is used to read the data in chunks.
+
+        Returns
+        -------
+        RecordBatchReader
+        """
+        ...
+
+
+@runtime_checkable
+class Scannable(Protocol):
+    @abstractmethod
+    def scanner(self, columns: Optional[List[str]] = None,
+                filter: Optional[Expression] = None, batch_size: Optional[int] = None,
+                use_threads: bool = True,
+                **kwargs) -> Scanner:
+        """Create a scanner for this dataset.
+
+        Parameters
+        ----------
+        columns : List[str], optional
+            Names of columns to include in the scan. If None, all columns are
+            included.
+        filter : Expression, optional
+            Filter expression to apply to the scan. If None, no filter is applied.
+        batch_size : int, optional
+            The number of rows to include in each batch. If None, the default
+            value is used. The default value is implementation specific.

Review Comment:
   Maybe expand that the default value is not only implementation specific but might not even be consistent between batches?
   
   Also, if this is a min/max or is it a maximum-only?  In other words, if `batch_size` is is `1_000_000` and the source is a parquet file with 10 row groups of `100_000` rows does the scanner need to accumulate the rows or is it acceptable to return smaller batches?



##########
python/pyarrow/dataset/protocol.py:
##########
@@ -0,0 +1,180 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Protocol definitions for pyarrow.dataset
+
+These provide the abstract interface for a dataset. Other libraries may implement
+this interface to expose their data, without having to extend PyArrow's classes.
+
+Applications and libraries that want to consume datasets should accept datasets
+that implement these protocols, rather than requiring the specific
+PyArrow classes.
+
+See Extending PyArrow Datasets for more information:
+
+https://arrow.apache.org/docs/python/integration/dataset.html
+"""
+from abc import abstractmethod, abstractproperty
+from typing import Iterator, List, Optional
+
+# TODO: remove once we drop support for Python 3.7
+if sys.version_info >= (3, 8):
+    from typing import Protocol, runtime_checkable
+else:
+    from typing_extensions import Protocol, runtime_checkable
+
+from pyarrow.dataset import Expression
+from pyarrow import Table, RecordBatchReader, Schema
+
+
+@runtime_checkable
+class Scanner(Protocol):
+    """
+    A scanner implementation for a dataset.
+
+    This may be a scan of a whole dataset, or a scan of a single fragment.
+    """
+    @abstractmethod
+    def count_rows(self) -> int:
+        """
+        Count the number of rows in this dataset.
+
+        Implementors may provide optimized code paths that compute this from metadata.
+
+        Returns
+        -------
+        int
+            The number of rows in the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def head(self, num_rows: int) -> Table:
+        """
+        Get the first ``num_rows`` rows of the dataset.
+
+        Parameters
+        ----------
+        num_rows : int
+            The number of rows to return.
+
+        Returns
+        -------
+        Table
+            A table containing the first ``num_rows`` rows of the dataset.
+        """
+        ...
+
+    @abstractmethod
+    def to_reader(self) -> RecordBatchReader:
+        """
+        Create a Record Batch Reader for this scan.
+
+        This is used to read the data in chunks.
+
+        Returns
+        -------
+        RecordBatchReader
+        """
+        ...
+
+
+@runtime_checkable
+class Scannable(Protocol):
+    @abstractmethod
+    def scanner(self, columns: Optional[List[str]] = None,
+                filter: Optional[Expression] = None, batch_size: Optional[int] = None,
+                use_threads: bool = True,
+                **kwargs) -> Scanner:
+        """Create a scanner for this dataset.
+
+        Parameters
+        ----------
+        columns : List[str], optional
+            Names of columns to include in the scan. If None, all columns are
+            included.
+        filter : Expression, optional
+            Filter expression to apply to the scan. If None, no filter is applied.
+        batch_size : int, optional

Review Comment:
   Should `batch_size` be part of `to_reader` instead of `scanner`?



##########
docs/source/python/integration/dataset.rst:
##########
@@ -0,0 +1,134 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+Extending PyArrow Datasets
+==========================
+
+PyArrow provides a core protocol for datasets, so third-party libraries can both
+produce and consume classes that conform to useful subset of the PyArrow dataset
+API. This subset provides enough functionality to provide predicate and filter
+pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
+
+.. image:: pyarrow_dataset_protocol.svg
+   :alt: A diagram showing the workflow for using the PyArrow Dataset protocol.
+         There are two flows shown, one for stream and one for tasks. The stream
+         case shows a linear flow from a producer class, to a dataset, to a 
+         scanner, and finally to a RecordBatchReader. The tasks case shows a
+         similar diagram, except the dataset is split into fragments, which are
+         then distributed to tasks, which each create their own scanner and
+         RecordBatchReader.
+
+Producers are responsible for outputting a class that conforms to the protocol.
+
+Consumers are responsible for calling methods on the protocol to get the data
+out of the dataset. The protocol supports getting data as a single stream or
+as a series of tasks which may be distributed.

Review Comment:
   This is "ok" but the definition of producer and consumer here are reversed from what they are in Substrait which confused me for a while.  Maybe we can go with "Data producer" and "Data consumer"?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org