You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/21 16:54:29 UTC

[GitHub] [arrow] amol- opened a new pull request, #13409: ARROW-16616: [Python] Lazy datasets filtering

amol- opened a new pull request, #13409:
URL: https://github.com/apache/arrow/pull/13409

   Expose a `Dataset.filter` method that applies a filter to the dataset without actually loading it in memory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913876367


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   @westonpace might have a better answer.
   
   Going for CScanNode I would had to manually provide the `ScanOptions` myself.
   I went for making a `RecordBatchReader` as that way I could reuse the Python logic in `FilteredDataset` that creates a scanner with the right filter, also in the future if `FilteredDataset._make_scanner` is extended to take care of more options, this code would benefit from that support for free.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1175317539

   @pitrou Addressed all comments, should be ready for re-review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1162020152

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913867548


##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -49,17 +49,28 @@ std::string ToString(JoinType t) {
 }
 
 Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
-    const Table& table, arrow::internal::Executor* exc) {
+    const Table& table, arrow::internal::Executor* executor) {
   std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(table);
 
-  if (exc == nullptr) return Status::TypeError("No executor provided.");
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
 
   // Map the RecordBatchReader to a SourceNode
-  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc));
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
 
   return std::shared_ptr<SourceNodeOptions>(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* executor) {
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
+
+  // Map the RecordBatchReader to a SourceNode
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
+
+  return std::shared_ptr<SourceNodeOptions>(new SourceNodeOptions(schema, batch_gen));

Review Comment:
   :+1:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1168896418

   > I have no idea whether we want to expose such lazy construction APIs on Dataset.
   
   FYI, this task has spawned from https://github.com/apache/arrow/pull/13155#discussion_r875076518 discussion


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rok commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
rok commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913651121


##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -61,5 +61,16 @@ Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* exc) {

Review Comment:
   Just a nit, but `executor` is used elsewhere in the codebase.
   ```suggestion
       arrow::internal::Executor* executor) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038338368


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   Given that passing `None` for the filter seems to be confusing, I simply disabled it. We can re-evaluate in the future if there will be the need as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038021273


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   Passing `None` as the filter is meant to be a way to remove all filters.
   To purpose is to allow things like retrieving fragments of a filtered dataset by doing `dataset.filter(None).get_fragments()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #13409: ARROW-16616: [Python] Add lazy Dataset.filter() method

Posted by GitBox <gi...@apache.org>.
amol- commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1341003172

   @jorisvandenbossche I checked for `ParquetDataset` and the experience if fairly confusing from the end user point of view. If the dataset is created using `ds.parquet_dataset` it will have the filter capabilities, but if it's created using `pyarrow.parquet.ParquetDataset` it won't have filtering capabilities. But `ParquetDataset` in its V2 implementation is just a proxy to `ds.Dataset`, so it could in theory gain filtering support.
   
   It seems that `ParquetDataset` is mostly a duplicate of what `ds.parquet_dataset` can do when `use_legacy_dataset=False`, so is there a reason why we keep it around? Is there a plan to deprecate it in the future?
   
   Asking because if the plan is to deprecate it some day, then it probably doesn't make much same to invest the effort to work toward feature parity with `ds.Dataset` and we can consider this task done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1176317887

   > 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r917795065


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   There shouldn't be any API on Dataset at the moment that should allow you to restrict the columns before executing a join through the exec plan. We don't yet provide any "projection" capability for `FilteredDataset` itself, only for scanner.
   
   Making ScanOptions seems like a good idea btw, would make easy to expand the capabilities in the future. Adding projections for example would be just one more passed option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r914281708


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:

Review Comment:
   If `self._filter` can be `None` then what is the advantage of creating a separate `FilteredDataset` instead of just adding `_filter` to the existing `Dataset`?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        if "filter" in options:
+            raise ValueError(
+                "Passing filter in scanner option is not valid for FilteredDataset."

Review Comment:
   Why wouldn't it just AND the incoming filter with the existing filter?



##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -49,17 +49,29 @@ std::string ToString(JoinType t) {
 }
 
 Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
-    const Table& table, arrow::internal::Executor* exc) {
+    const Table& table, arrow::internal::Executor* executor) {
   std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(table);
 
-  if (exc == nullptr) return Status::TypeError("No executor provided.");
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
 
   // Map the RecordBatchReader to a SourceNode
-  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc));
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
 
   return std::shared_ptr<SourceNodeOptions>(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* executor) {
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
+
+  // Map the RecordBatchReader to a SourceNode
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
+
+  return std::shared_ptr<SourceNodeOptions>(
+      new SourceNodeOptions(std::move(schema), batch_gen));

Review Comment:
   ```suggestion
     return std::make_shared<SourceNodeOptions>(std::move(schema), std::move(batch_gen));
   ```



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   While there shouldn't be a significant performance difference this does seem a little less than ideal.  Could the `_make_scanner` method be changed to a method that creates scan options instead?
   
   Also, if the user only asks for some columns does the projection get passed down into the `ToRecordBatchReader` call?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r996996652


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   The code has been updated to now use ScanOptions and rely on those for the compute engine plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913843039


##########
docs/source/python/compute.rst:
##########
@@ -368,5 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter:
    nums: [[6,8,10]]
    chars: [["f","h","l"]]
 
-:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method
-passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation.
+:class:`.Dataset` can be filtered with :meth:`.Dataset.filter` method too.

Review Comment:
   Wording suggestion
   ```suggestion
   :class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method.
   ```



##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -49,17 +49,28 @@ std::string ToString(JoinType t) {
 }
 
 Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
-    const Table& table, arrow::internal::Executor* exc) {
+    const Table& table, arrow::internal::Executor* executor) {
   std::shared_ptr<RecordBatchReader> reader = std::make_shared<TableBatchReader>(table);
 
-  if (exc == nullptr) return Status::TypeError("No executor provided.");
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
 
   // Map the RecordBatchReader to a SourceNode
-  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), exc));
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
 
   return std::shared_ptr<SourceNodeOptions>(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* executor) {
+  if (executor == nullptr) return Status::TypeError("No executor provided.");
+
+  // Map the RecordBatchReader to a SourceNode
+  ARROW_ASSIGN_OR_RAISE(auto batch_gen, MakeReaderGenerator(std::move(reader), executor));
+
+  return std::shared_ptr<SourceNodeOptions>(new SourceNodeOptions(schema, batch_gen));

Review Comment:
   Nit: move parameter to avoid copying (you may need to reformat after that :-))
   ```suggestion
     return std::shared_ptr<SourceNodeOptions>(new SourceNodeOptions(std::move(schema), batch_gen));
   ```



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):

Review Comment:
   I wonder why the existing code for Dataset below wouldn't work here. Especially, one creates a Source node, the other a Scan node. Is there a functional or performance difference? cc @westonpace for insight.



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):
+            node_factory = "source"
+            c_in_dataset = (<Dataset>ipt).unwrap()
+            c_dataset_scanner = <shared_ptr[CScanner]>(
+                (<Scanner>(<FilteredDataset>ipt)._make_scanner({})).unwrap()
+            )
+            c_recordbatchreader_in = <shared_ptr[CRecordBatchReader]>(

Review Comment:
   Same question here: why the explicit cast to `<shared_ptr[CRecordBatchReader]>`?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):

Review Comment:
   Can you add a docstring here? It won't be inherited automatically from the parent, unfortunately.



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):
+            node_factory = "source"
+            c_in_dataset = (<Dataset>ipt).unwrap()
+            c_dataset_scanner = <shared_ptr[CScanner]>(

Review Comment:
   I'm curious: why do you need the explicit cast to `shared_ptr[CScanner]` here?



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        scanner_options = dict(options, filter=self._filter)
+        return Scanner.from_dataset(self, **scanner_options)
+
+    def scanner(self, **kwargs):

Review Comment:
   Here as well, we should probably add a docstring.
   (or try to arrange the code such that overriding isn't possible? perhaps by delegating to `_make_scanner` both in the base class and here?)



##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        scanner_options = dict(options, filter=self._filter)
+        return Scanner.from_dataset(self, **scanner_options)

Review Comment:
   Simpler wording:
   ```suggestion
           return Scanner.from_dataset(self, filter=self._filter, **options)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913871729


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        scanner_options = dict(options, filter=self._filter)
+        return Scanner.from_dataset(self, **scanner_options)

Review Comment:
   My original code was overwriting any filter provided in `options` with the one in `self._filter` (as providing a filter to `_make_scanner` shouldn't be done. With the code you suggested it's going to error, which might even be better, but it's probably going to error with an unrelated error like `got multiple values for keyword argument 'x'` which might be confusing for the user.
   
   I guess I can just trap if `filter` is provided in options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1034591358


##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2545,6 +2545,12 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
     cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions":
         pass
 
+    cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions):
+        @staticmethod
+        CResult[shared_ptr[CSourceNodeOptions]] FromRecordBatchReader(

Review Comment:
   No anymore, but it still had a value on its own, so I didn't remove it. If it's a concern I can easily get rid of it. (Even though I would leave the C++ implementation around).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038141857


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   But I don't see what `.filter(None)` could mean if not to unset the current filter. Would you prefer a dedicated `Dataset.reset_filter()` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1168840012

   I have no idea whether we want to expose such lazy construction APIs on Dataset.
   
   cc @jorisvandenbossche 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1180241621

   > I'm not completely against this but having `FilteredDataset` instead of something like `Query` might be a bit short-sighted. What happens if a user wants to add a dynamic column (project)?
   > 
   > If you had both a projection expression and a filter expression that might be more close to what scanner / datasets provides today.
   
   @westonpace I'm not particularly attached to the `FilteredDataset` name, I just want to avoid using the `Query` name explicitly to ensure we avoid hinting users that it's ever supposed to become a fully fledged query system at the moment. They can use IBIS if they are looking for that.
   
   I also dislike the idea of reusing the `Scanner` class as it smells like hijacking its data read responsibility. I wanted a name that conveys correctly the idea of something that represents a dataset with an applied transformation and to which additional transformations can be applied. Maybe `QueriedDataset` could do? Smells a lot like the query already happened, so not exactly what I was looking for. I'm open to suggestions. Naming things correctly seems hard, I could try invalidating some caches instead ;)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- closed pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- closed pull request #13409: ARROW-16616: [Python] Lazy datasets filtering
URL: https://github.com/apache/arrow/pull/13409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1291807667

   From https://github.com/apache/arrow/pull/13409#discussion_r914281708
   
   >> If `self._filter` can be `None` then what is the advantage of creating a separate `FilteredDataset` instead of just adding `_filter` to the existing `Dataset`?
   >
   > That was the original implementation, and I was asked to explicitly move it in a dedicated class. Which I think in the end makes sense, better have a single responsibility per class.
   
   Could you expand on this a bit? (I don't know where or why it was asked to move to a dedicated class, the only reference I find in the other PR is the question if this shouldn't live on the Scanner) 
   
   It seems to me that if we want to expose a helper `filter()` method (although it doesn't give that much of value compared to passing the filter to the method that actually will do the scanning, i.e. `to_table(..)`, `to_batches(..)`, etc), adding it just to the main Dataset class will expose the least amount of new API that we "lock in" (it avoids deciding now if we want some "Query" like class)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913868111


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):

Review Comment:
   oh, nice catch, I added it to the parent but forgot here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038154782


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   > But I don't see what .filter(None) could mean if not to unset the current filter.
   
   For me the simplest interpretation would be to "not add any new filter". 
   Users don't necessarily have the concept of "current filter" in their mind. The dataset is already filtered at that point, and then `filter(None)` would just not filter it any further.
   
   
   
   > Would you prefer a dedicated `Dataset.reset_filter()` method?
   
   We could also defer that until there are some actual use cases / requests for this? I am not sure if there is a good use case to first filter and then reset it to be able to call get_fragments (except while interactively exploring, but in such a case you can also easily recreate the object) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038133124


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   Ah, that was not clear to me (and should also be documented then). 
   
   Personally I would also not expect such behaviour, since that seems to go against my expectation for composable method chains (based on my experience with pandas, and from a quick check also not what dplyr filter does when passing no filter), where each method does something in addition to the previous one, and not undo it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- merged pull request #13409: ARROW-16616: [Python] Add lazy Dataset.filter() method

Posted by GitBox <gi...@apache.org>.
amol- merged PR #13409:
URL: https://github.com/apache/arrow/pull/13409


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1034589611


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -89,35 +93,42 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     # Create source nodes for each input
     for ipt in inputs:
         if isinstance(ipt, Table):
-            node_factory = "table_source"
             c_in_table = pyarrow_unwrap_table(ipt)
             c_tablesourceopts = make_shared[CTableSourceNodeOptions](
                 c_in_table)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+
+            current_decl = CDeclaration(
+                tobytes("table_source"), no_c_inputs, c_input_node_opts)
         elif isinstance(ipt, Dataset):
-            node_factory = "scan"
             c_in_dataset = (<Dataset>ipt).unwrap()
             c_scanopts = make_shared[CScanNodeOptions](
-                c_in_dataset, make_shared[CScanOptions]())
-            deref(deref(c_scanopts).scan_options).use_threads = use_threads
+                c_in_dataset, Scanner._make_scan_options(ipt, {"use_threads": use_threads}))
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions](
                 c_scanopts)
+
+            # Filters applied in CScanNodeOptions are "best effort" for the scan node itself,
+            # so we always need to inject an additional Filter node to apply them for real.
+            current_decl = CDeclaration(tobytes("filter"), no_c_inputs,
+                                        static_pointer_cast[CExecNodeOptions, CFilterNodeOptions](
+                make_shared[CFilterNodeOptions](
+                    deref(deref(c_scanopts).scan_options).filter)
+            )
+            )
+            current_decl.inputs.push_back(
+                CDeclaration.Input(CDeclaration(
+                    tobytes("scan"), no_c_inputs, c_input_node_opts))

Review Comment:
   It does go first. In the sense that the Scan node is set as an `input` to the Filter node. That's why the Filter node is create first, so that it is then possible to append to its inputs the Scan node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r914032940


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +443,46 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):

Review Comment:
   Added docstrings. To avoid having them going out of sync I added a reference to the original method documentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1177788975

   I am personally also a bit wary of adding a new public class like `FilteredDataset` (at least until we have had the broader discussion about how we want to provide a dataframe-like / query object interface, as similar discussions will keep coming up for other methods). 
   If we want to provide this `filter()` method on the short term, I would also prefer doing it just on `Dataset`, as Weston suggested (that was also my original idea for this issue). Although that also creates its backwards compatibility issues of course, if we later let this method return an object backed by a query plan, as that then might not keep all methods/attributes that are currently available on Dataset. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913679727


##########
cpp/src/arrow/compute/exec/options.cc:
##########
@@ -61,5 +61,16 @@ Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromTable(
       new SourceNodeOptions(table.schema(), batch_gen));
 }
 
+Result<std::shared_ptr<SourceNodeOptions>> SourceNodeOptions::FromRecordBatchReader(
+    std::shared_ptr<RecordBatchReader> reader, std::shared_ptr<Schema> schema,
+    arrow::internal::Executor* exc) {

Review Comment:
   I copied the original signature from the previous `SourceNodeOptions::FromTable` which had `exec`. I renamed the parameter in both places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r913884529


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -95,6 +98,23 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
                 c_in_table, 1 << 20)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+        elif isinstance(ipt, FilteredDataset):
+            node_factory = "source"
+            c_in_dataset = (<Dataset>ipt).unwrap()
+            c_dataset_scanner = <shared_ptr[CScanner]>(
+                (<Scanner>(<FilteredDataset>ipt)._make_scanner({})).unwrap()
+            )
+            c_recordbatchreader_in = <shared_ptr[CRecordBatchReader]>(

Review Comment:
   Going to remove a bunch of explicit casts, they are a leftover from before I had enough type information in Cython code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1162020126

   https://issues.apache.org/jira/browse/ARROW-16616


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1038021273


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   Passing `None` as the filter is meant to be a way to remove all filters.
   To purpose is to allow retrieving fragments of a filtered dataset by doing `result.filter(None).get_fragments()`



##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   Passing `None` as the filter is meant to be a way to remove all filters.
   To purpose is to allow things like retrieving fragments of a filtered dataset by doing `result.filter(None).get_fragments()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1177800553

   `get_fragments` is also another method where the user _could_ expect this filter to be applied (giving the same result as specifying the `filter` keyword)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r917804146


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:
+            new_filter = self._filter & expression
+        else:
+            new_filter = expression
+        filtered_dataset = self.__class__.__new__(self.__class__)
+        filtered_dataset.init(self.wrapped)
+        filtered_dataset._filter = new_filter
+        return filtered_dataset
+
+    cdef Scanner _make_scanner(self, options):
+        if "filter" in options:
+            raise ValueError(
+                "Passing filter in scanner option is not valid for FilteredDataset."

Review Comment:
   Good point, I made it error as I tweaked the original behaviour in the change request from Antoine, but I guess we can just and it. I guess it's just a matter of documenti it properly as some people might expect that like for `Dataset` passing a `filter` overrides the existing one, instead of extending it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] amol- commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
amol- commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r917803202


##########
python/pyarrow/_dataset.pyx:
##########
@@ -432,6 +447,73 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+cdef class FilteredDataset(Dataset):
+    """
+    A Dataset with an applied filter.
+
+    Parameters
+    ----------
+    dataset : Dataset
+        The dataset to which the filter should be applied.
+    expression : Expression
+        The filter that should be applied to the dataset.
+    """
+
+    def __init__(self, dataset, expression):
+        self.init(<shared_ptr[CDataset]>(<Dataset>dataset).wrapped)
+        self._filter = expression
+
+    cdef void init(self, const shared_ptr[CDataset]& sp):
+        Dataset.init(self, sp)
+        self._filter = None
+
+    def filter(self, expression):
+        """Apply an additional row filter to the filtered dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        FilteredDataset
+        """
+        cdef:
+            FilteredDataset filtered_dataset
+
+        if self._filter is not None:

Review Comment:
   That was the original implementation, and I was asked to explicitly move it in a dedicated class. Which I think in the end makes sense, better have a single responsibility per class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1034448378


##########
docs/source/python/compute.rst:
##########
@@ -368,8 +368,19 @@ our ``even_filter`` with a ``pc.field("nums") > 5`` filter:
    nums: [[6,8,10]]
    chars: [["f","h","l"]]
 
-:class:`.Dataset` currently can be filtered using :meth:`.Dataset.to_table` method
-passing a ``filter`` argument. See :ref:`py-filter-dataset` in Dataset documentation.
+:class:`.Dataset` can similarly be filtered with the :meth:`.Dataset.filter` method.
+The method will return an instance of :class:`.FilteredDataset` which will lazily

Review Comment:
   ```suggestion
   The method will return an instance of :class:`.Dataset` which will lazily
   ```



##########
python/pyarrow/_dataset.pyx:
##########
@@ -433,7 +477,6 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
-

Review Comment:
   Small nit, but you can keep this blank line (PEP8 recommends 2 blank lines between class definitions)



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2545,6 +2545,12 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
     cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions":
         pass
 
+    cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions):
+        @staticmethod
+        CResult[shared_ptr[CSourceNodeOptions]] FromRecordBatchReader(

Review Comment:
   Is this still used in the current version of the PR?



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -89,35 +93,42 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     # Create source nodes for each input
     for ipt in inputs:
         if isinstance(ipt, Table):
-            node_factory = "table_source"
             c_in_table = pyarrow_unwrap_table(ipt)
             c_tablesourceopts = make_shared[CTableSourceNodeOptions](
                 c_in_table)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+
+            current_decl = CDeclaration(
+                tobytes("table_source"), no_c_inputs, c_input_node_opts)
         elif isinstance(ipt, Dataset):
-            node_factory = "scan"
             c_in_dataset = (<Dataset>ipt).unwrap()
             c_scanopts = make_shared[CScanNodeOptions](
-                c_in_dataset, make_shared[CScanOptions]())
-            deref(deref(c_scanopts).scan_options).use_threads = use_threads
+                c_in_dataset, Scanner._make_scan_options(ipt, {"use_threads": use_threads}))
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions](
                 c_scanopts)
+
+            # Filters applied in CScanNodeOptions are "best effort" for the scan node itself,
+            # so we always need to inject an additional Filter node to apply them for real.
+            current_decl = CDeclaration(tobytes("filter"), no_c_inputs,
+                                        static_pointer_cast[CExecNodeOptions, CFilterNodeOptions](
+                make_shared[CFilterNodeOptions](
+                    deref(deref(c_scanopts).scan_options).filter)
+            )
+            )
+            current_decl.inputs.push_back(
+                CDeclaration.Input(CDeclaration(
+                    tobytes("scan"), no_c_inputs, c_input_node_opts))

Review Comment:
   I would naively expect the scan node to go first, followed by the filter node. Or the order doesn't matter?



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -89,35 +93,42 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     # Create source nodes for each input
     for ipt in inputs:
         if isinstance(ipt, Table):
-            node_factory = "table_source"
             c_in_table = pyarrow_unwrap_table(ipt)
             c_tablesourceopts = make_shared[CTableSourceNodeOptions](
                 c_in_table)
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
                 c_tablesourceopts)
+
+            current_decl = CDeclaration(
+                tobytes("table_source"), no_c_inputs, c_input_node_opts)
         elif isinstance(ipt, Dataset):
-            node_factory = "scan"
             c_in_dataset = (<Dataset>ipt).unwrap()
             c_scanopts = make_shared[CScanNodeOptions](
-                c_in_dataset, make_shared[CScanOptions]())
-            deref(deref(c_scanopts).scan_options).use_threads = use_threads
+                c_in_dataset, Scanner._make_scan_options(ipt, {"use_threads": use_threads}))
             c_input_node_opts = static_pointer_cast[CExecNodeOptions, CScanNodeOptions](
                 c_scanopts)
+
+            # Filters applied in CScanNodeOptions are "best effort" for the scan node itself,
+            # so we always need to inject an additional Filter node to apply them for real.
+            current_decl = CDeclaration(tobytes("filter"), no_c_inputs,
+                                        static_pointer_cast[CExecNodeOptions, CFilterNodeOptions](
+                make_shared[CFilterNodeOptions](
+                    deref(deref(c_scanopts).scan_options).filter)
+            )
+            )

Review Comment:
   This is very hard to read in the current formatting. Could you see if you can reformat this a bit (with more logical indentation), or otherwise maybe assign the FilterNodeOptions first to a temp variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #13409: ARROW-16616: [Python] Lazy datasets filtering

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on code in PR #13409:
URL: https://github.com/apache/arrow/pull/13409#discussion_r1037912092


##########
python/pyarrow/_dataset.pyx:
##########
@@ -385,6 +416,32 @@ cdef class Dataset(_Weakrefable):
         """The common schema of the full Dataset"""
         return pyarrow_wrap_schema(self.dataset.schema())
 
+    def filter(self, expression):
+        """
+        Apply a row filter to the dataset.
+
+        Parameters
+        ----------
+        expression : Expression
+            The filter that should be applied to the dataset.
+
+        Returns
+        -------
+        Dataset
+        """
+        cdef:
+            Dataset filtered_dataset
+
+        new_filter = expression
+        current_filter = self._scan_options.get("filter")
+        if current_filter is not None and new_filter is not None:
+            new_filter = current_filter & new_filter

Review Comment:
   If you check that `new_filter is not None`, that means we are covering the case that `new_filter` would be None, but in that case we have to set `new_filter = current_filter` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on pull request #13409: ARROW-16616: [Python] Add lazy Dataset.filter() method

Posted by GitBox <gi...@apache.org>.
jorisvandenbossche commented on PR #13409:
URL: https://github.com/apache/arrow/pull/13409#issuecomment-1341088096

   I think you certainly don't have to care about ParquetDataset in this PR (we generally didn't add any of the additional methods from pyarrow.dataset.Dataset to it, so this PR just follows that). And let's have the discussion about what to do with ParquetDataset in a dedicated place (eg we already have https://issues.apache.org/jira/browse/ARROW-9720)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org