You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Andy Grove (Jira)" <ji...@apache.org> on 2020/02/17 15:11:00 UTC

[jira] [Commented] (ARROW-7866) [Rust] How to handle aggregates with Datafusion?

    [ https://issues.apache.org/jira/browse/ARROW-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038425#comment-17038425 ] 

Andy Grove commented on ARROW-7866:
-----------------------------------

Are you able to share the CSV file so I can reproduce?

`SELECT COUNT(expr)` definitely wouldn't return a StringArray though but a numeric type. You can use the meta data in the RecordBatch to determine the column data types.

> [Rust] How to handle aggregates with Datafusion?
> ------------------------------------------------
>
>                 Key: ARROW-7866
>                 URL: https://issues.apache.org/jira/browse/ARROW-7866
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust - DataFusion
>    Affects Versions: 0.16.0
>            Reporter: Istvan Szukacs
>            Priority: Minor
>
> Hi,
> I am trying to use the Rust tools and I was running into this interesting behavior.
> {code}
> use std::sync::Arc;
> extern crate arrow;
> extern crate datafusion;
> use arrow::array::\{StringArray, UInt32Array};
> use arrow::datatypes::\{DataType, Field, Schema};
> use datafusion::error::Result;
> use datafusion::execution::context::ExecutionContext;
> fn main() -> Result<()> {
>  // create local execution context
>  let mut ctx = ExecutionContext::new();
> // define schema for data source (csv file)
>  let schema = Arc::new(Schema::new(vec![
>  Field::new("index", DataType::UInt32, false),
>  Field::new("tablename", DataType::Utf8, false),
>  Field::new("datetime", DataType::Utf8, false),
>  Field::new("partition", DataType::Utf8, false),
>  Field::new("fileSize", DataType::UInt32, false),
>  ]));
> // register csv file with the execution context
>  ctx.register_csv(
>  "awesome_table",
>  &format!(
>  "../tableMetaData/anon/{}",
>  "013ff3245cc90a3394943da0b6552b9f98313288e0a174f94f8dcb10.csv"
>  ),
>  &schema,
>  true,
>  );
> let sql = "SELECT COUNT(partition) FROM awesome_table";
> // create the query plan
>  let plan = ctx.create_logical_plan(&sql)?;
>  let plan = ctx.optimize(&plan)?;
>  let plan = ctx.create_physical_plan(&plan, 1024 * 1024)?;
> // execute the query
>  let results = ctx.collect(plan.as_ref())?;
> // iterate over the results
>  results.iter().for_each(|batch| {
>  println!(
>  "RecordBatch has {} rows and {} columns",
>  batch.num_rows(),
>  batch.num_columns()
>  );
> let retf1 = batch.column(0).as_any().downcast_ref::<StringArray>();
> match retf1 {
>  Some(_) => println!("Some retf1"),
>  None => println!("None retf1"),
>  }
> let retf2 = batch.column(1).as_any().downcast_ref::<StringArray>();
> match retf2 {
>  Some(_) => println!("Some retf2"),
>  None => println!("None retf2"),
>  }
>  
>  // for i in 0..batch.num_rows() {
>  // println!("P: {}", pcnt.value(i));
>  // }
>  });
> Ok(())
> }
> {code}
>  
>  
> {code}
> cargo build --release && ./target/release/cli
> RecordBatch has 1 rows and 1 columns
> RecordBatch has 1 rows and 2 columns
> None retf1
> None retf2
> {code}
>  
> In case of no aggregation:
>  
> {code}
> let sql = "SELECT index FROM awesome_table";
> // create the query plan
>  let plan = ctx.create_logical_plan(&sql)?;
>  let plan = ctx.optimize(&plan)?;
>  let plan = ctx.create_physical_plan(&plan, 1024 * 1024)?;
> // execute the query
>  let results = ctx.collect(plan.as_ref())?;
> // iterate over the results
>  results.iter().for_each(|batch| {
>  println!(
>  "RecordBatch has {} rows and {} columns",
>  batch.num_rows(),
>  batch.num_columns(),
>  );
> let retf1 = batch.column(0).as_any().downcast_ref::<UInt32Array>();
> match retf1 {
>  Some(_) => println!("Some retf1"),
>  None => println!("None retf1"),
>  }
> // let retf2 = batch.column(1).as_any().downcast_ref::<UInt32Array>();
> // match retf2 {
>  // Some(_) => println!("Some retf2"),
>  // None => println!("None retf2"),
>  // }
> for i in 0..batch.num_rows() {
>  println!("P: {}", retf1.unwrap().value(i));
>  }
>  });
> Ok(())
> }
> {code}
>  
> {code}
>  
> P: 126436
> P: 126437
> P: 126438
> P: 126439
> P: 126440
> P: 126441
> {code}
>   
> Is there a way to access the fields when aggregating?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)