You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/25 18:37:59 UTC

[GitHub] [arrow-datafusion] Igosuki commented on issue #1484: Is it possible to query multiple parquet files ?

Igosuki commented on issue #1484:
URL: https://github.com/apache/arrow-datafusion/issues/1484#issuecomment-1001058967


   Hi, here is some example code directly from a personal project : 
   ```rust
   /// Read partitions from my_table
   pub fn stream_df<P: 'static + AsRef<Path> + Debug>(
       partitions: HashSet<(P, Vec<(&'static str, String)>)>,
       format: String,
   ) -> impl Stream<Item = RecordBatch> + 'static {
       debug!("{:?}", &partitions);
       let s = partitions.into_iter().map(move |(base_path, partition)| {
           let format = format.clone();
           let base_path = base_path.as_ref().to_str().unwrap_or("").to_string();
   
           stream! {
               let now = Instant::now();
               let mut ctx = ExecutionContext::new();
               let (ext, file_format) = df_format(format);
               let listing_options = ListingOptions {
                   file_extension: ext.to_string(),
                   format: file_format,
                   table_partition_cols: partition.iter().map(|p| p.0.to_string()).collect(),
                   collect_stat: true,
                   target_partitions: 8,
               };
               ctx.register_listing_table(
                   "my_table",
                   &format!("file://{}", base_path),
                   listing_options,
                   None,
               )
               .await.unwrap();
               let where_clause = where_clause(&mut partition.iter());
               let df = ctx
                   .clone()
                   .sql(&format!("select * from my_table {where_clause} order by event_ms asc", where_clause = &where_clause))
                   .await
                   .unwrap();
               let collected = df.execute_stream().await.unwrap();
               let elapsed = now.elapsed();
               info!(
                   "Read records in {} for {:?} in {}.{}s",
                   base_path,
                   partition,
                   elapsed.as_secs(),
                   elapsed.subsec_millis()
               );
               for await batch in collected {
                   yield batch.unwrap();
               }
               let elapsed = now.elapsed();
               info!(
                   "Pushed record stream in {} for {:?} in {}.{}s",
                   base_path,
                   partition,
                   elapsed.as_secs(),
                   elapsed.subsec_millis()
               );
           }
       });
       tokio_stream::iter(s).flatten()
   }
   ```
   
   stream! is from async_stream, format is obviously 'parquet' 'csv' 'avro' or 'json', partitions are base paths to read from


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org