You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Remi Dettai (Jira)" <ji...@apache.org> on 2020/10/23 09:03:00 UTC
[jira] [Commented] (ARROW-10368) [Rust][DataFusion] Refactor scan
nodes to allow extensions
[ https://issues.apache.org/jira/browse/ARROW-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17219558#comment-17219558 ]
Remi Dettai commented on ARROW-10368:
-------------------------------------
[~jorgecarleitao] thanks for looking into this. It's so nice not to be talking alone :D
I have renamed the issue and changed the description to better relate the new direction this issue is taking.
I am going to open a PR with the new abstraction, and we can move this very technical question over there. I think I can handle the change, but I will definitively need your opinions on various aspect.
{quote}Note that the code now expects a stream, not an iterator, of RecordBatch, which may help at reading s3 sources.
{quote}
Not sure that helps a lot, as the parquet reader still requires you to plunge back to sync world (ARROW-10307 is really really hard)
> [Rust][DataFusion] Refactor scan nodes to allow extensions
> ----------------------------------------------------------
>
> Key: ARROW-10368
> URL: https://issues.apache.org/jira/browse/ARROW-10368
> Project: Apache Arrow
> Issue Type: Improvement
> Components: Rust, Rust - DataFusion
> Reporter: Remi Dettai
> Priority: Major
>
> The first intention of this issue was to refactor InMemoryScan to use an iterator to make it more flexible:
> {quote}Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
> - the outer Vec separates the partitions
> - the inner Vec contains all the RecordBatch for one partition
> The inner Vec is then converted into an iterator when the LogicalPlan is turned into a PhysicalPlan.
> I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>. This would make it possible to plug custom Scan implementations into datafusion without the need to read them entirely into memory. It would still work pretty seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted with data.map(|x| x.iter()) first.{quote}
> After further inspection (see discussion below), it seems more appropriate to completely refactor the way scan nodes are organized. The idea is to replace all specific XxxScan nodes with a generic SourceScan node:
> {code:java}
> /// A node that generates source data
> LogicalPlan::SourceScan {
> /// A shared reference to the source implementation
> scanner: Arc<dyn SourceScanner>,
> },
> {code}
> with:
> {code:java}
> #[async_trait]
> /// A scanner implementation that can be used by datafusion
> pub trait SourceScanner: Send + Sync + fmt::Debug {
> /// reference to the schema of the data as it will be read by this scanner
> fn projected_schema(&self) -> &SchemaRef;
> /// string display of this scanner
> fn format(&self) -> &str;
> /// apply projection on this scanner
> fn project(
> &self,
> required_columns: &HashSet<String>,
> has_projection: bool,
> ) -> Result<Arc<dyn SourceScanner>>;
> /// get scanner partitioning
> fn output_partitioning(&self) -> Partitioning;
> /// get iterator for a given partition
> async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader + Send>>;
> }
> {code}
> The current specific implementations of scanner will then be provided by implementations of SourceScanner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)