You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "YoungRX (via GitHub)" <gi...@apache.org> on 2023/03/08 03:37:48 UTC

[GitHub] [arrow] YoungRX opened a new issue, #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

YoungRX opened a new issue, #34494:
URL: https://github.com/apache/arrow/issues/34494

   ### Describe the usage question you have. Please include as many useful details as  possible.
   
   
   I use `AsyncScanner::ToRecordBatchReader()` and `ScannerRecordBatchReader::ReadNext(std::shared_ptr<RecordBatch>* batch)` to read the data from the Parquet file. And I set the size of a batch to 1000.
   
   When I use the following SQL statement:
   
   > `select * from parquet_1 limit 10;`
   
   My code scans to get a batch and completes the task. Then, reset the shared pointers such as scanner and recordBatchReader. 
   In this case, the read operation inside the `ReadNext `does not stop. 
   
   Abort occurred when calling PoolBuffer::Reserve in `arrow/cpp/src/arrow/memory_pool.cc`.
   Specifically, abort occurs at `RETURN_NOT_OK(pool_->Allocate(new_capacity, &ptr));`.
   
   So could you help me to solve this problem with the limit clause? 
   Maybe I could add some filter expressions or use a function to stop `ReadNext`.
   
   ### Component(s)
   
   C++


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] YoungRX closed issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "YoungRX (via GitHub)" <gi...@apache.org>.
YoungRX closed issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?
URL: https://github.com/apache/arrow/issues/34494


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34494:
URL: https://github.com/apache/arrow/issues/34494#issuecomment-1460614234

   The scanner API doesn't have any sort of proper cancellation.  A scan must be fully consumed to stop all background work.
   
   However, you can move past the scanner to start using [Declaration](https://arrow.apache.org/docs/cpp/streaming_execution.html) directly (at this point the scanner is basically a front-end for streaming execution engine).
   
   The plan [created by the scanner](https://github.com/apache/arrow/blob/apache-arrow-11.0.0/cpp/src/arrow/dataset/scanner.cc#L441-L447) is:
   
   ```
   compute::Declaration::Sequence(
   {
     {"scan", ScanNodeOptions{dataset_, scan_options_, sequence_fragments}},
     {"filter", compute::FilterNodeOptions{scan_options_->filter}},
     {"augmented_project",
       // exprs comes from the scan options also
       compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}
     }
   )
   ```
   
   Starting with 12.0.0 (or the latest main) you can simply do:
   
   ```
   compute::Declaration::Sequence(
   {
     {"scan", ScanNodeOptions{dataset_, scan_options_, sequence_fragments}},
     {"filter", compute::FilterNodeOptions{scan_options_->filter}},
     {"project",
       compute::ProjectNodeOptions{std::move(exprs), std::move(names)}}
     },
     {"fetch", compute::FetchNodeOptions(read_offset, read_limit)
   )
   ```
   
   You can use `compute::DeclarationToTable` or `compute::DeclarationToReader` to process the declaration.  The "fetch" node can is used to implement paging (after a filter).  There may also be some paging options getting added to the scan node directly (for paging before filtering which is more efficient) but it's not clear that will make it in 12.0.0 yet.  If you need to work with 11.0.0 or earlier then your options are:
   
    1. Use ExecPlan (Declarations use ExecPlan under the hood but it adds a bunch of complexity) and use a custom sink node (select_k_sink).  This does pretty much the same thing but in a more complex way.
    2. Use ExecPlan and call StopProducing on the plan once you have gotten enough data.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] YoungRX commented on issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "YoungRX (via GitHub)" <gi...@apache.org>.
YoungRX commented on issue #34494:
URL: https://github.com/apache/arrow/issues/34494#issuecomment-1469238004

   Hello? If I choose the second option, how can I use ExecPlan and call StopProducing on the plan of `AsyncScanner::ScanBatchesUnorderedAsync` once I have gotten enough data?
   
   Or I should build my own ExecPlan and ScanBatchesfunctions? Then I can call StopProducing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34494:
URL: https://github.com/apache/arrow/issues/34494#issuecomment-1472652228

   > How do I completely destroy the generator mentioned in the above code to call stop_producing?
   
   You cannot, unfortunately, through the scanner APIs.
   
   > Or I should build my own ExecPlan and ScanBatchesfunctions? Then I can call StopProducing?
   
   Yes, this is what I was thinking.  This is probably also going to be more future-proof.  This destroys the generator but I think some work still happens in the background as it is wrapping up files.  There is a new scan node in development which will not do background work and will stop more quickly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] YoungRX commented on issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "YoungRX (via GitHub)" <gi...@apache.org>.
YoungRX commented on issue #34494:
URL: https://github.com/apache/arrow/issues/34494#issuecomment-1461289041

   Thanks for your help. 
   
   I'm using 8.0.0. And I found the following code in `AsyncScanner::ScanBatchesUnorderedAsync`:
   ```
     // If the generator is destroyed before being completely drained, inform plan
     std::shared_ptr<void> stop_producing{
         nullptr, [plan, exec_context](...) {
           bool not_finished_yet = plan->finished().TryAddCallback(
               [&plan, &exec_context] { return [plan, exec_context](const Status&) {}; });
   
           if (not_finished_yet) {
             plan->StopProducing();
           }
         }};
   ```
   Now I have shared pointers like scanner and recordBatchReader. The possible cause is that the value of `use_count()` of the shared pointer is greater than 1. The memory cannot be released through `reset()`.
   
   How do I completely destroy the generator mentioned in the above code to call `stop_producing`?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] YoungRX commented on issue #34494: [C++] How to handle the limit clause when scanning Parquet files using Scanner?

Posted by "YoungRX (via GitHub)" <gi...@apache.org>.
YoungRX commented on issue #34494:
URL: https://github.com/apache/arrow/issues/34494#issuecomment-1487859394

   That's really unlucky. 
   I'm looking forward to the completion of the new scan node in development.
   And is there some plans or link about this node? I'd like to know the information about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org