You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 06:45:45 UTC

[GitHub] [arrow] jorgecarleitao commented on issue #9178: Support for the binary SQL type in rust/datafusion

jorgecarleitao commented on issue #9178:
URL: https://github.com/apache/arrow/issues/9178#issuecomment-765142713


   Hi @ilya-biryukov, have you tried?
   
   The following works for me (based on the example `simple_udf`):
   
   ```rust
   use arrow::{
       array::{ArrayRef, BinaryArray, Int64Array},
       datatypes::DataType,
       record_batch::RecordBatch,
       util::pretty,
   };
   
   use datafusion::error::Result;
   use datafusion::{physical_plan::functions::ScalarFunctionImplementation, prelude::*};
   use std::sync::Arc;
   
   // create local execution context with an in-memory table
   fn create_context() -> Result<ExecutionContext> {
       use arrow::datatypes::{Field, Schema};
       use datafusion::datasource::MemTable;
       // define a schema.
       let schema = Arc::new(Schema::new(vec![
           Field::new("c", DataType::Binary, false),
       ]));
   
       let a: &[u8] = b"aaaa";
       // define data.
       let batch = RecordBatch::try_new(
           schema.clone(),
           vec![
               Arc::new(BinaryArray::from(vec![Some(a), None, None, None])),
           ],
       )?;
   
       // declare a new context. In spark API, this corresponds to a new spark SQLsession
       let mut ctx = ExecutionContext::new();
   
       // declare a table in memory. In spark API, this corresponds to createDataFrame(...).
       let provider = MemTable::try_new(schema, vec![vec![batch]])?;
       ctx.register_table("t", Box::new(provider));
       Ok(ctx)
   }
   
   /// In this example we will declare a single-type, single return type UDF that exponentiates f64, a^b
   #[tokio::main]
   async fn main() -> Result<()> {
       let mut ctx = create_context()?;
   
       // First, declare the actual implementation of the calculation
       let len: ScalarFunctionImplementation = Arc::new(|args: &[ArrayRef]| {
           // in DataFusion, all `args` and output are dynamically-typed arrays, which means that we need to:
           // 1. cast the values to the type we want
           // 2. perform the computation for every element in the array (using a loop or SIMD)
   
           // this is guaranteed by DataFusion based on the function's signature.
           assert_eq!(args.len(), 1);
   
           let value = &args[0]
               .as_any()
               .downcast_ref::<BinaryArray>()
               .expect("cast failed");
   
           // 2. run the UDF
           let array: Int64Array = value.iter().map(|base| {
               // in arrow, any value can be null.
               base.map(|x| x.len() as i64)
           }).collect();
           Ok(Arc::new(array))
       });
   
       // Next:
       // * give it a name so that it shows nicely when the plan is printed
       // * declare what input it expects
       // * declare its return type
       let len = create_udf(
           "len",
           vec![DataType::Binary],
           Arc::new(DataType::Int64),
           len,
       );
   
       // at this point, we can use it or register it, depending on the use-case:
       // * if the UDF is expected to be used throughout the program in different contexts,
       //   we can register it, and call it later:
       ctx.register_udf(len.clone()); // clone is only required in this example because we show both usages
   
       // * if the UDF is expected to be used directly in the scope, `.call` it directly:
       let expr = len.call(vec![col("c")]);
   
       // get a DataFrame from the context
       let df = ctx.table("t")?;
   
       // equivalent to `'SELECT pow(a, b), pow(a, b) AS pow1 FROM t'`
       let df = df.select(vec![
           expr,
       ])?;
   
       // execute the query
       let results = df.collect().await?;
   
       // print the results
       pretty::print_batches(&results)?;
   
       Ok(())
   }
   ```
   
   ```
   +--------+
   | len(c) |
   +--------+
   | 4      |
   |        |
   |        |
   |        |
   +--------+
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org