You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Remi Dettai (Jira)" <ji...@apache.org> on 2020/10/23 09:03:00 UTC

[jira] [Commented] (ARROW-10368) [Rust][DataFusion] Refactor scan nodes to allow extensions

    [ https://issues.apache.org/jira/browse/ARROW-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219558#comment-17219558 ] 

Remi Dettai commented on ARROW-10368:
-------------------------------------

[~jorgecarleitao] thanks for looking into this. It's so nice not to be talking alone :D

I have renamed the issue and changed the description to better relate the new direction this issue is taking.

I am going to open a PR with the new abstraction, and we can move this very technical question over there. I think I can handle the change, but I will definitively need your opinions on various aspect.
{quote}Note that the code now expects a stream, not an iterator, of RecordBatch, which may help at reading s3 sources.
{quote}
Not sure that helps a lot, as the parquet reader still requires you to plunge back to sync world (ARROW-10307 is really really hard)

> [Rust][DataFusion] Refactor scan nodes to allow extensions
> ----------------------------------------------------------
>
>                 Key: ARROW-10368
>                 URL: https://issues.apache.org/jira/browse/ARROW-10368
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust, Rust - DataFusion
>            Reporter: Remi Dettai
>            Priority: Major
>
> The first intention of this issue was to refactor InMemoryScan to use an iterator to make it more flexible:
> {quote}Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
> - the outer Vec separates the partitions
> - the inner Vec contains all the RecordBatch for one partition
> The inner Vec is then converted into an iterator when the LogicalPlan is turned into a PhysicalPlan.
> I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>.  This would make it possible to plug custom Scan implementations into datafusion without the need to read them entirely into memory. It would still work pretty seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted with data.map(|x| x.iter()) first.{quote}
> After further inspection (see discussion below), it seems more appropriate to completely refactor the way scan nodes are organized. The idea is to replace all specific XxxScan nodes with a generic SourceScan node:
> {code:java}
> /// A node that generates source data
> LogicalPlan::SourceScan {
>     /// A shared reference to the source implementation
>     scanner: Arc<dyn SourceScanner>,
> },
> {code}
> with:
> {code:java}
> #[async_trait]
> /// A scanner implementation that can be used by datafusion
> pub trait SourceScanner: Send + Sync + fmt::Debug {
>   /// reference to the schema of the data as it will be read by this scanner
>   fn projected_schema(&self) -> &SchemaRef;
>   /// string display of this scanner
>   fn format(&self) -> &str;
>   /// apply projection on this scanner
>   fn project(
>     &self,
>     required_columns: &HashSet<String>,
>     has_projection: bool,
>   ) -> Result<Arc<dyn SourceScanner>>;
>   /// get scanner partitioning
>   fn output_partitioning(&self) -> Partitioning;
>   /// get iterator for a given partition
>   async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader + Send>>;
> }
> {code}
> The current specific implementations of scanner will then be provided by implementations of SourceScanner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)