You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@arrow.apache.org by Matthew Turner <ma...@outlook.com> on 2020/12/10 22:11:29 UTC

[Rust] DataFusion performance

Hello,

I've been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.

I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).

Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
*Rust took around 20-25ms to read the csv (compared to 32ms from pandas)

However, when filtering the data I was surprised to see that pandas was way faster.
*Rust took around 500-600ms to filter the csv(compared to 1ms from pandas)

My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn't have that immediately accessible and I ran many times to confirm it was roughly consistent.

Is this performance expected? Or am I using datafusion incorrectly?

Any insight is much appreciated!

[Rust]
```
use datafusion::error::Result;
use datafusion::prelude::*;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<()> {
    let start = Instant::now();

    let mut ctx = ExecutionContext::new();

    let ratings_csv = "ratings_small.csv";

    let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
    println!("Read CSV Duration: {:?}", start.elapsed());

    let q_start = Instant::now();
    let results = df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
    println!("Filter duration: {:?}", q_start.elapsed());

    println!("Duration: {:?}", start.elapsed());

    Ok(())
}
```

[Python]
```
In [1]: df = pd.read_csv("ratings_small.csv")
32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [2]: df.query("userId==1")
1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
```

[1]: https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.csv


Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786


Re: [Rust] DataFusion performance

Posted by Andy Grove <an...@gmail.com>.
oops,. I managed to hit a magical key combination that sent the mail
prematurely. Let's try that again.

Hi Matthew,

I went ahead and created a PR to add this to our benchmark suite. I will
aim to finish this over the weekend.

https://github.com/apache/arrow/pull/8896

If I run in debug mode with "cargo run --bin movies" I get a timing of 526
ms, which seems similar to the timing you are seeing.

If I run in release mode with "cargo run --release --bin movies" then the
time drops down to 21 ms.

Are you running in release mode?

Thanks,

Andy.


On Thu, Dec 10, 2020 at 10:46 PM Andy Grove <an...@gmail.com> wrote:

> Hi Matthew,
>
> I went ahead and created a PR to add this to our benchmark suite. I will
> aim to finish this over the weekend.
>
>
> On Thu, Dec 10, 2020 at 3:11 PM Matthew Turner <
> matthew.m.turner@outlook.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’ve been playing around with DataFusion to explore the feasibility of
>> replacing current python/pandas data processing jobs with Rust/datafusion.
>> Ultimately, looking to improve performance / decrease cost.
>>
>>
>>
>> I was doing some simple tests to start to measure performance differences
>> on a simple task (read a csv[1] and filter it).
>>
>>
>>
>> Reading the csv datafusion seemed to outperform pandas by around 30%
>> which was nice.
>>
>> *Rust took around 20-25ms to read the csv (compared to 32ms from pandas)
>>
>>
>>
>> However, when filtering the data I was surprised to see that pandas was
>> way faster.
>>
>> *Rust took around 500-600ms to filter the csv(compared to 1ms from pandas)
>>
>>
>>
>> My code for each is below.  I know I should be running the DataFusion
>> times through something similar to pythons %timeit but I didn’t have that
>> immediately accessible and I ran many times to confirm it was roughly
>> consistent.
>>
>>
>>
>> Is this performance expected? Or am I using datafusion incorrectly?
>>
>>
>>
>> Any insight is much appreciated!
>>
>>
>>
>> [Rust]
>>
>> ```
>>
>> use datafusion::error::Result;
>>
>> use datafusion::prelude::*;
>>
>> use std::time::Instant;
>>
>>
>>
>> #[tokio::main]
>>
>> async fn main() -> Result<()> {
>>
>>     let start = Instant::now();
>>
>>
>>
>>     let mut ctx = ExecutionContext::new();
>>
>>
>>
>>     let ratings_csv = "ratings_small.csv";
>>
>>
>>
>>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
>>
>>     println!("Read CSV Duration: {:?}", start.elapsed());
>>
>>
>>
>>     let q_start = Instant::now();
>>
>>     let results = df
>>
>>         .filter(col("userId").eq(lit(1)))?
>>
>>         .collect()
>>
>>         .await
>>
>>         .unwrap();
>>
>>     println!("Filter duration: {:?}", q_start.elapsed());
>>
>>
>>
>>     println!("Duration: {:?}", start.elapsed());
>>
>>
>>
>>     Ok(())
>>
>> }
>>
>> ```
>>
>>
>>
>> [Python]
>>
>> ```
>>
>> In [1]: df = pd.read_csv(“ratings_small.csv”)
>>
>> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>>
>>
>>
>> In [2]: df.query(“userId==1”)
>>
>> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
>>
>> ```
>>
>>
>>
>> [1]:
>> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.csv
>>
>>
>>
>>
>>
>> *Matthew M. Turner*
>>
>> Email*:* matthew.m.turner@outlook.com
>>
>> Phone: (908)-868-2786
>>
>>
>>
>

Re: [Rust] DataFusion performance

Posted by Andy Grove <an...@gmail.com>.
Hi Matthew,

I went ahead and created a PR to add this to our benchmark suite. I will
aim to finish this over the weekend.


On Thu, Dec 10, 2020 at 3:11 PM Matthew Turner <ma...@outlook.com>
wrote:

> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences
> on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which
> was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was
> way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion
> times through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
>
> ```
>
>
>
> [1]:
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.csv
>
>
>
>
>
> *Matthew M. Turner*
>
> Email*:* matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>
>

RE: [Rust] DataFusion performance

Posted by Matthew Turner <ma...@outlook.com>.
Thanks, Jorge. Makes sense.

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

From: Jorge Cardoso Leitão <jo...@gmail.com>
Sent: Friday, December 11, 2020 11:06 PM
To: user@arrow.apache.org
Subject: Re: [Rust] DataFusion performance

Hi Mattew,

SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on an Arc.

We do this because the plans are often passed between thread boundaries and thus wrapping them on an Arc allows that.

Best,
Jorge


On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner <ma...@outlook.com>> wrote:
Thanks! Converting the schema to owned made it work.

The type of the schema param is SchemaRef – which I thought would allow a reference.  Is this not the case?

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

From: Andy Grove <an...@gmail.com>>
Sent: Friday, December 11, 2020 10:16 AM
To: user@arrow.apache.org<ma...@arrow.apache.org>
Subject: Re: [Rust] DataFusion performance

Hi Matthew,

Using the latest DataFusion from GitHub master branch, the following code works for in-memory:

use std::sync::Arc;
use std::time::Instant;

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::datasource::MemTable;

#[tokio::main]
async fn main() -> Result<()> {
    //TODO add command-line args
    let ratings_csv = "/tmp/movies/ratings_small.csv";
    let mut ctx = ExecutionContext::new();
    let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(Arc::new(df.schema().to_owned().into()), batches)?;
    ctx.register_table("memory_table", Box::new(provider));
    let mem_df = ctx.table("memory_table")?;
    let q_start = Instant::now();
    let _results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
    println!("Duration: {:?}", q_start.elapsed());
    Ok(())
}

Andy.

On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <ma...@outlook.com>> wrote:
Played around some more - it was because I wasn’t using --release flag.  Sry about that, still learning rust.

Using that flag, the total time to read and filter is between 52 and 80ms.

In general, what should I expect when comparing the performance of pandas to datafusion?

@Andy Grove thanks for adding that.  If there is a need for additional datafusion benchmarks and what I do could help with that then I would be happy to contribute it.  I will send a follow up once ive made progress.

I'm also still having trouble with that memory table, so any help there is appreciated.

Thanks for your time!  Very excited by this.

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

-----Original Message-----
From: Matthew Turner <ma...@outlook.com>>
Sent: Friday, December 11, 2020 12:24 AM
To: user@arrow.apache.org<ma...@arrow.apache.org>
Subject: RE: [Rust] DataFusion performance

Thanks for context! Makes sense.

Even with that, when comparing the total time of each (read + filter) DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?

Also, im trying to bring the table in memory now to perform the computation from there and compare performance.  Code below.  But I'm getting an error (beneath the code) even though I think ive constructed the MemTable correctly (from [1]).  From what I see all the types are the same as when I used the original df from read_csv so I'm not sure what I'm doing wrong.

I also saw there was an open issue [2] for this error type raised on rust-lang - so im unsure if its my implementation, datafusion/arrow issue, or Rust issue.

Thanks again for help!

```
    let sch = Arc::new(df.schema());
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(sch, batches)?;

    ctx.register_table("memory_table", Box::new(provider));

    let mem_df = ctx.table("memory_table")?;

    let q_start = Instant::now();
    let results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
```

Which is returning this error:

error[E0698]: type inside `async` block must be known in this context
  --> src\main.rs:37:38
   |
37 |         .filter(col("userId").eq(lit(1)))?
   |                                      ^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async` block because of this `await`
  --> src\main.rs:36:19
   |
36 |       let results = mem_df
   |  ___________________^
37 | |         .filter(col("userId").eq(lit(1)))?
38 | |         .collect()
39 | |         .await
   | |______________^


[1] https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

-----Original Message-----
From: Michael Mior <mm...@apache.org>>
Sent: Thursday, December 10, 2020 8:55 PM
To: user@arrow.apache.org<ma...@arrow.apache.org>
Subject: Re: [Rust] DataFusion performance

Contrary to what you might expect given the name, read_csv does not actually read the CSV file. It instead creates the start of a logical execution plan which involves reading the CSV file when that plan is finally executed. This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So you're comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pandas).

There's nothing wrong with your use of DataFusion per se, you simply weren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org<ma...@apache.org>

Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com>> a écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from
> pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn’t have that immediately accessible and I ran many times to confirm it was roughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv,
> CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> each)
>
> ```
>
>
>
> [1]:
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com<ma...@outlook.com>
>
> Phone: (908)-868-2786
>
>

Re: [Rust] DataFusion performance

Posted by Daniël Heres <da...@gmail.com>.
Hello Matthew,

If you want to try to get absolutely the best performance you can get now
from DataFusion:

* Make sure you are using the latest version from master, there have been a
lot of improvements lately.

* Compile DataFusion with "simd" feature on. This requires a recent version
of DataFusion, but it gives speeds for some computations.

* Compile your code with lto = true like this in your Cargo.toml file:

[profile.release]
lto = true

This will increase the compile time considerably, but allows Rust / LLVM to
do more optimizations on the entire program. There are some other settings
documented here
https://doc.rust-lang.org/cargo/reference/profiles.html#release

* Set the environment variable RUSTFLAGS="-C target-cpu=native". this
allows Rust/LLVM to use all CPU instructions available on your CPU. This
way the binary becomes not portable anymore though.

We are also improving the performance over time, e.g. recently a lot parts
in Arrow / DataFusion have been improved in the last months such as faster
CSV reader and faster computations, and there is still a lot to come.

Best,

Daniël

Op za 12 dec. 2020 om 05:06 schreef Jorge Cardoso Leitão <
jorgecarleitao@gmail.com>:

> Hi Mattew,
>
> SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on
> an Arc.
>
> We do this because the plans are often passed between thread boundaries
> and thus wrapping them on an Arc allows that.
>
> Best,
> Jorge
>
>
> On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner <
> matthew.m.turner@outlook.com> wrote:
>
>> Thanks! Converting the schema to owned made it work.
>>
>>
>>
>> The type of the schema param is SchemaRef – which I thought would allow a
>> reference.  Is this not the case?
>>
>>
>>
>> *Matthew M. Turner*
>>
>> Email*:* matthew.m.turner@outlook.com
>>
>> Phone: (908)-868-2786
>>
>>
>>
>> *From:* Andy Grove <an...@gmail.com>
>> *Sent:* Friday, December 11, 2020 10:16 AM
>> *To:* user@arrow.apache.org
>> *Subject:* Re: [Rust] DataFusion performance
>>
>>
>>
>> Hi Matthew,
>>
>>
>>
>> Using the latest DataFusion from GitHub master branch, the following code
>> works for in-memory:
>>
>> use std::sync::Arc;
>> use std::time::Instant;
>>
>> use datafusion::error::Result;
>> use datafusion::prelude::*;
>> use datafusion::datasource::MemTable;
>>
>> #[tokio::main]
>> async fn main() -> Result<()> {
>>     //
>> *TODO add command-line args    *let ratings_csv = "/tmp/movies/ratings_small.csv";
>>     let mut ctx = ExecutionContext::*new*();
>>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::*new*()).unwrap();
>>     let batches = vec![df.collect().await?];
>>     let provider = MemTable::*new*(Arc::*new*(df.schema().to_owned().into()), batches)?;
>>     ctx.register_table("memory_table", Box::*new*(provider));
>>     let mem_df = ctx.table("memory_table")?;
>>     let q_start = Instant::*now*();
>>     let _results = mem_df
>>         .filter(col("userId").eq(lit(1)))?
>>         .collect()
>>         .await
>>         .unwrap();
>>     println!("Duration: {:?}", q_start.elapsed());
>>     *Ok*(())
>> }
>>
>>
>>
>> Andy.
>>
>>
>>
>> On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <
>> matthew.m.turner@outlook.com> wrote:
>>
>> Played around some more - it was because I wasn’t using --release flag.
>> Sry about that, still learning rust.
>>
>> Using that flag, the total time to read and filter is between 52 and 80ms.
>>
>> In general, what should I expect when comparing the performance of pandas
>> to datafusion?
>>
>> @Andy Grove thanks for adding that.  If there is a need for additional
>> datafusion benchmarks and what I do could help with that then I would be
>> happy to contribute it.  I will send a follow up once ive made progress.
>>
>> I'm also still having trouble with that memory table, so any help there
>> is appreciated.
>>
>> Thanks for your time!  Very excited by this.
>>
>> Matthew M. Turner
>> Email: matthew.m.turner@outlook.com
>> Phone: (908)-868-2786
>>
>> -----Original Message-----
>> From: Matthew Turner <ma...@outlook.com>
>> Sent: Friday, December 11, 2020 12:24 AM
>> To: user@arrow.apache.org
>> Subject: RE: [Rust] DataFusion performance
>>
>> Thanks for context! Makes sense.
>>
>> Even with that, when comparing the total time of each (read + filter)
>> DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?
>>
>> Also, im trying to bring the table in memory now to perform the
>> computation from there and compare performance.  Code below.  But I'm
>> getting an error (beneath the code) even though I think ive constructed the
>> MemTable correctly (from [1]).  From what I see all the types are the same
>> as when I used the original df from read_csv so I'm not sure what I'm doing
>> wrong.
>>
>> I also saw there was an open issue [2] for this error type raised on
>> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
>> or Rust issue.
>>
>> Thanks again for help!
>>
>> ```
>>     let sch = Arc::new(df.schema());
>>     let batches = vec![df.collect().await?];
>>     let provider = MemTable::new(sch, batches)?;
>>
>>     ctx.register_table("memory_table", Box::new(provider));
>>
>>     let mem_df = ctx.table("memory_table")?;
>>
>>     let q_start = Instant::now();
>>     let results = mem_df
>>         .filter(col("userId").eq(lit(1)))?
>>         .collect()
>>         .await
>>         .unwrap();
>> ```
>>
>> Which is returning this error:
>>
>> error[E0698]: type inside `async` block must be known in this context
>>   --> src\main.rs:37:38
>>    |
>> 37 |         .filter(col("userId").eq(lit(1)))?
>>    |                                      ^ cannot infer type for type
>> `{integer}`
>>    |
>> note: the type is part of the `async` block because of this `await`
>>   --> src\main.rs:36:19
>>    |
>> 36 |       let results = mem_df
>>    |  ___________________^
>> 37 | |         .filter(col("userId").eq(lit(1)))?
>> 38 | |         .collect()
>> 39 | |         .await
>>    | |______________^
>>
>>
>> [1]
>> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
>> [2] https://github.com/rust-lang/rust/issues/63502
>>
>> Matthew M. Turner
>> Email: matthew.m.turner@outlook.com
>> Phone: (908)-868-2786
>>
>> -----Original Message-----
>> From: Michael Mior <mm...@apache.org>
>> Sent: Thursday, December 10, 2020 8:55 PM
>> To: user@arrow.apache.org
>> Subject: Re: [Rust] DataFusion performance
>>
>> Contrary to what you might expect given the name, read_csv does not
>> actually read the CSV file. It instead creates the start of a logical
>> execution plan which involves reading the CSV file when that plan is
>> finally executed. This happens when you call collect().
>>
>> Pandas read_csv on the other hand immediately reads the CSV file. So
>> you're comparing the time of reading AND filtering the file
>> (DataFusion) with the time to filter data which has already been read
>> (Pandas).
>>
>> There's nothing wrong with your use of DataFusion per se, you simply
>> weren't measuring what you thought you were measuring.
>> --
>> Michael Mior
>> mmior@apache.org
>>
>> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <
>> matthew.m.turner@outlook.com> a écrit :
>> >
>> > Hello,
>> >
>> >
>> >
>> > I’ve been playing around with DataFusion to explore the feasibility of
>> replacing current python/pandas data processing jobs with Rust/datafusion.
>> Ultimately, looking to improve performance / decrease cost.
>> >
>> >
>> >
>> > I was doing some simple tests to start to measure performance
>> differences on a simple task (read a csv[1] and filter it).
>> >
>> >
>> >
>> > Reading the csv datafusion seemed to outperform pandas by around 30%
>> which was nice.
>> >
>> > *Rust took around 20-25ms to read the csv (compared to 32ms from
>> > pandas)
>> >
>> >
>> >
>> > However, when filtering the data I was surprised to see that pandas was
>> way faster.
>> >
>> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
>> > pandas)
>> >
>> >
>> >
>> > My code for each is below.  I know I should be running the DataFusion
>> times through something similar to pythons %timeit but I didn’t have that
>> immediately accessible and I ran many times to confirm it was roughly
>> consistent.
>> >
>> >
>> >
>> > Is this performance expected? Or am I using datafusion incorrectly?
>> >
>> >
>> >
>> > Any insight is much appreciated!
>> >
>> >
>> >
>> > [Rust]
>> >
>> > ```
>> >
>> > use datafusion::error::Result;
>> >
>> > use datafusion::prelude::*;
>> >
>> > use std::time::Instant;
>> >
>> >
>> >
>> > #[tokio::main]
>> >
>> > async fn main() -> Result<()> {
>> >
>> >     let start = Instant::now();
>> >
>> >
>> >
>> >     let mut ctx = ExecutionContext::new();
>> >
>> >
>> >
>> >     let ratings_csv = "ratings_small.csv";
>> >
>> >
>> >
>> >     let df = ctx.read_csv(ratings_csv,
>> > CsvReadOptions::new()).unwrap();
>> >
>> >     println!("Read CSV Duration: {:?}", start.elapsed());
>> >
>> >
>> >
>> >     let q_start = Instant::now();
>> >
>> >     let results = df
>> >
>> >         .filter(col("userId").eq(lit(1)))?
>> >
>> >         .collect()
>> >
>> >         .await
>> >
>> >         .unwrap();
>> >
>> >     println!("Filter duration: {:?}", q_start.elapsed());
>> >
>> >
>> >
>> >     println!("Duration: {:?}", start.elapsed());
>> >
>> >
>> >
>> >     Ok(())
>> >
>> > }
>> >
>> > ```
>> >
>> >
>> >
>> > [Python]
>> >
>> > ```
>> >
>> > In [1]: df = pd.read_csv(“ratings_small.csv”)
>> >
>> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>> >
>> >
>> >
>> > In [2]: df.query(“userId==1”)
>> >
>> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
>> > each)
>> >
>> > ```
>> >
>> >
>> >
>> > [1]:
>> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
>> > sv
>> >
>> >
>> >
>> >
>> >
>> > Matthew M. Turner
>> >
>> > Email: matthew.m.turner@outlook.com
>> >
>> > Phone: (908)-868-2786
>> >
>> >
>>
>>

-- 
Daniël Heres

Re: [Rust] DataFusion performance

Posted by Jorge Cardoso Leitão <jo...@gmail.com>.
Hi Mattew,

SchemaRef is just an alias for Arc<Schema>. Thus, you need to wrap it on an
Arc.

We do this because the plans are often passed between thread boundaries and
thus wrapping them on an Arc allows that.

Best,
Jorge


On Fri, Dec 11, 2020 at 8:14 PM Matthew Turner <ma...@outlook.com>
wrote:

> Thanks! Converting the schema to owned made it work.
>
>
>
> The type of the schema param is SchemaRef – which I thought would allow a
> reference.  Is this not the case?
>
>
>
> *Matthew M. Turner*
>
> Email*:* matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>
>
> *From:* Andy Grove <an...@gmail.com>
> *Sent:* Friday, December 11, 2020 10:16 AM
> *To:* user@arrow.apache.org
> *Subject:* Re: [Rust] DataFusion performance
>
>
>
> Hi Matthew,
>
>
>
> Using the latest DataFusion from GitHub master branch, the following code
> works for in-memory:
>
> use std::sync::Arc;
> use std::time::Instant;
>
> use datafusion::error::Result;
> use datafusion::prelude::*;
> use datafusion::datasource::MemTable;
>
> #[tokio::main]
> async fn main() -> Result<()> {
>     //
> *TODO add command-line args    *let ratings_csv = "/tmp/movies/ratings_small.csv";
>     let mut ctx = ExecutionContext::*new*();
>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::*new*()).unwrap();
>     let batches = vec![df.collect().await?];
>     let provider = MemTable::*new*(Arc::*new*(df.schema().to_owned().into()), batches)?;
>     ctx.register_table("memory_table", Box::*new*(provider));
>     let mem_df = ctx.table("memory_table")?;
>     let q_start = Instant::*now*();
>     let _results = mem_df
>         .filter(col("userId").eq(lit(1)))?
>         .collect()
>         .await
>         .unwrap();
>     println!("Duration: {:?}", q_start.elapsed());
>     *Ok*(())
> }
>
>
>
> Andy.
>
>
>
> On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <
> matthew.m.turner@outlook.com> wrote:
>
> Played around some more - it was because I wasn’t using --release flag.
> Sry about that, still learning rust.
>
> Using that flag, the total time to read and filter is between 52 and 80ms.
>
> In general, what should I expect when comparing the performance of pandas
> to datafusion?
>
> @Andy Grove thanks for adding that.  If there is a need for additional
> datafusion benchmarks and what I do could help with that then I would be
> happy to contribute it.  I will send a follow up once ive made progress.
>
> I'm also still having trouble with that memory table, so any help there is
> appreciated.
>
> Thanks for your time!  Very excited by this.
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Matthew Turner <ma...@outlook.com>
> Sent: Friday, December 11, 2020 12:24 AM
> To: user@arrow.apache.org
> Subject: RE: [Rust] DataFusion performance
>
> Thanks for context! Makes sense.
>
> Even with that, when comparing the total time of each (read + filter)
> DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?
>
> Also, im trying to bring the table in memory now to perform the
> computation from there and compare performance.  Code below.  But I'm
> getting an error (beneath the code) even though I think ive constructed the
> MemTable correctly (from [1]).  From what I see all the types are the same
> as when I used the original df from read_csv so I'm not sure what I'm doing
> wrong.
>
> I also saw there was an open issue [2] for this error type raised on
> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
> or Rust issue.
>
> Thanks again for help!
>
> ```
>     let sch = Arc::new(df.schema());
>     let batches = vec![df.collect().await?];
>     let provider = MemTable::new(sch, batches)?;
>
>     ctx.register_table("memory_table", Box::new(provider));
>
>     let mem_df = ctx.table("memory_table")?;
>
>     let q_start = Instant::now();
>     let results = mem_df
>         .filter(col("userId").eq(lit(1)))?
>         .collect()
>         .await
>         .unwrap();
> ```
>
> Which is returning this error:
>
> error[E0698]: type inside `async` block must be known in this context
>   --> src\main.rs:37:38
>    |
> 37 |         .filter(col("userId").eq(lit(1)))?
>    |                                      ^ cannot infer type for type
> `{integer}`
>    |
> note: the type is part of the `async` block because of this `await`
>   --> src\main.rs:36:19
>    |
> 36 |       let results = mem_df
>    |  ___________________^
> 37 | |         .filter(col("userId").eq(lit(1)))?
> 38 | |         .collect()
> 39 | |         .await
>    | |______________^
>
>
> [1]
> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
> [2] https://github.com/rust-lang/rust/issues/63502
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Michael Mior <mm...@apache.org>
> Sent: Thursday, December 10, 2020 8:55 PM
> To: user@arrow.apache.org
> Subject: Re: [Rust] DataFusion performance
>
> Contrary to what you might expect given the name, read_csv does not
> actually read the CSV file. It instead creates the start of a logical
> execution plan which involves reading the CSV file when that plan is
> finally executed. This happens when you call collect().
>
> Pandas read_csv on the other hand immediately reads the CSV file. So
> you're comparing the time of reading AND filtering the file
> (DataFusion) with the time to filter data which has already been read
> (Pandas).
>
> There's nothing wrong with your use of DataFusion per se, you simply
> weren't measuring what you thought you were measuring.
> --
> Michael Mior
> mmior@apache.org
>
> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com>
> a écrit :
> >
> > Hello,
> >
> >
> >
> > I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
> >
> >
> >
> > I was doing some simple tests to start to measure performance
> differences on a simple task (read a csv[1] and filter it).
> >
> >
> >
> > Reading the csv datafusion seemed to outperform pandas by around 30%
> which was nice.
> >
> > *Rust took around 20-25ms to read the csv (compared to 32ms from
> > pandas)
> >
> >
> >
> > However, when filtering the data I was surprised to see that pandas was
> way faster.
> >
> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
> > pandas)
> >
> >
> >
> > My code for each is below.  I know I should be running the DataFusion
> times through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
> >
> >
> >
> > Is this performance expected? Or am I using datafusion incorrectly?
> >
> >
> >
> > Any insight is much appreciated!
> >
> >
> >
> > [Rust]
> >
> > ```
> >
> > use datafusion::error::Result;
> >
> > use datafusion::prelude::*;
> >
> > use std::time::Instant;
> >
> >
> >
> > #[tokio::main]
> >
> > async fn main() -> Result<()> {
> >
> >     let start = Instant::now();
> >
> >
> >
> >     let mut ctx = ExecutionContext::new();
> >
> >
> >
> >     let ratings_csv = "ratings_small.csv";
> >
> >
> >
> >     let df = ctx.read_csv(ratings_csv,
> > CsvReadOptions::new()).unwrap();
> >
> >     println!("Read CSV Duration: {:?}", start.elapsed());
> >
> >
> >
> >     let q_start = Instant::now();
> >
> >     let results = df
> >
> >         .filter(col("userId").eq(lit(1)))?
> >
> >         .collect()
> >
> >         .await
> >
> >         .unwrap();
> >
> >     println!("Filter duration: {:?}", q_start.elapsed());
> >
> >
> >
> >     println!("Duration: {:?}", start.elapsed());
> >
> >
> >
> >     Ok(())
> >
> > }
> >
> > ```
> >
> >
> >
> > [Python]
> >
> > ```
> >
> > In [1]: df = pd.read_csv(“ratings_small.csv”)
> >
> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
> >
> >
> >
> > In [2]: df.query(“userId==1”)
> >
> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> > each)
> >
> > ```
> >
> >
> >
> > [1]:
> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> > sv
> >
> >
> >
> >
> >
> > Matthew M. Turner
> >
> > Email: matthew.m.turner@outlook.com
> >
> > Phone: (908)-868-2786
> >
> >
>
>

RE: [Rust] DataFusion performance

Posted by Matthew Turner <ma...@outlook.com>.
Thanks! Converting the schema to owned made it work.

The type of the schema param is SchemaRef – which I thought would allow a reference.  Is this not the case?

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

From: Andy Grove <an...@gmail.com>
Sent: Friday, December 11, 2020 10:16 AM
To: user@arrow.apache.org
Subject: Re: [Rust] DataFusion performance

Hi Matthew,

Using the latest DataFusion from GitHub master branch, the following code works for in-memory:

use std::sync::Arc;
use std::time::Instant;

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::datasource::MemTable;

#[tokio::main]
async fn main() -> Result<()> {
    //TODO add command-line args
    let ratings_csv = "/tmp/movies/ratings_small.csv";
    let mut ctx = ExecutionContext::new();
    let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(Arc::new(df.schema().to_owned().into()), batches)?;
    ctx.register_table("memory_table", Box::new(provider));
    let mem_df = ctx.table("memory_table")?;
    let q_start = Instant::now();
    let _results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
    println!("Duration: {:?}", q_start.elapsed());
    Ok(())
}

Andy.

On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <ma...@outlook.com>> wrote:
Played around some more - it was because I wasn’t using --release flag.  Sry about that, still learning rust.

Using that flag, the total time to read and filter is between 52 and 80ms.

In general, what should I expect when comparing the performance of pandas to datafusion?

@Andy Grove thanks for adding that.  If there is a need for additional datafusion benchmarks and what I do could help with that then I would be happy to contribute it.  I will send a follow up once ive made progress.

I'm also still having trouble with that memory table, so any help there is appreciated.

Thanks for your time!  Very excited by this.

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

-----Original Message-----
From: Matthew Turner <ma...@outlook.com>>
Sent: Friday, December 11, 2020 12:24 AM
To: user@arrow.apache.org<ma...@arrow.apache.org>
Subject: RE: [Rust] DataFusion performance

Thanks for context! Makes sense.

Even with that, when comparing the total time of each (read + filter) DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?

Also, im trying to bring the table in memory now to perform the computation from there and compare performance.  Code below.  But I'm getting an error (beneath the code) even though I think ive constructed the MemTable correctly (from [1]).  From what I see all the types are the same as when I used the original df from read_csv so I'm not sure what I'm doing wrong.

I also saw there was an open issue [2] for this error type raised on rust-lang - so im unsure if its my implementation, datafusion/arrow issue, or Rust issue.

Thanks again for help!

```
    let sch = Arc::new(df.schema());
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(sch, batches)?;

    ctx.register_table("memory_table", Box::new(provider));

    let mem_df = ctx.table("memory_table")?;

    let q_start = Instant::now();
    let results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
```

Which is returning this error:

error[E0698]: type inside `async` block must be known in this context
  --> src\main.rs:37:38
   |
37 |         .filter(col("userId").eq(lit(1)))?
   |                                      ^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async` block because of this `await`
  --> src\main.rs:36:19
   |
36 |       let results = mem_df
   |  ___________________^
37 | |         .filter(col("userId").eq(lit(1)))?
38 | |         .collect()
39 | |         .await
   | |______________^


[1] https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502

Matthew M. Turner
Email: matthew.m.turner@outlook.com<ma...@outlook.com>
Phone: (908)-868-2786

-----Original Message-----
From: Michael Mior <mm...@apache.org>>
Sent: Thursday, December 10, 2020 8:55 PM
To: user@arrow.apache.org<ma...@arrow.apache.org>
Subject: Re: [Rust] DataFusion performance

Contrary to what you might expect given the name, read_csv does not actually read the CSV file. It instead creates the start of a logical execution plan which involves reading the CSV file when that plan is finally executed. This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So you're comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pandas).

There's nothing wrong with your use of DataFusion per se, you simply weren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org<ma...@apache.org>

Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com>> a écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from
> pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn’t have that immediately accessible and I ran many times to confirm it was roughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv,
> CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> each)
>
> ```
>
>
>
> [1]:
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com<ma...@outlook.com>
>
> Phone: (908)-868-2786
>
>

Re: [Rust] DataFusion performance

Posted by Andy Grove <an...@gmail.com>.
Hi Matthew,

Using the latest DataFusion from GitHub master branch, the following code
works for in-memory:

use std::sync::Arc;
use std::time::Instant;

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::datasource::MemTable;

#[tokio::main]
async fn main() -> Result<()> {
    //TODO add command-line args
    let ratings_csv = "/tmp/movies/ratings_small.csv";
    let mut ctx = ExecutionContext::new();
    let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
    let batches = vec![df.collect().await?];
    let provider =
MemTable::new(Arc::new(df.schema().to_owned().into()), batches)?;
    ctx.register_table("memory_table", Box::new(provider));
    let mem_df = ctx.table("memory_table")?;
    let q_start = Instant::now();
    let _results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
    println!("Duration: {:?}", q_start.elapsed());
    Ok(())
}


Andy.

On Fri, Dec 11, 2020 at 7:59 AM Matthew Turner <ma...@outlook.com>
wrote:

> Played around some more - it was because I wasn’t using --release flag.
> Sry about that, still learning rust.
>
> Using that flag, the total time to read and filter is between 52 and 80ms.
>
> In general, what should I expect when comparing the performance of pandas
> to datafusion?
>
> @Andy Grove thanks for adding that.  If there is a need for additional
> datafusion benchmarks and what I do could help with that then I would be
> happy to contribute it.  I will send a follow up once ive made progress.
>
> I'm also still having trouble with that memory table, so any help there is
> appreciated.
>
> Thanks for your time!  Very excited by this.
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Matthew Turner <ma...@outlook.com>
> Sent: Friday, December 11, 2020 12:24 AM
> To: user@arrow.apache.org
> Subject: RE: [Rust] DataFusion performance
>
> Thanks for context! Makes sense.
>
> Even with that, when comparing the total time of each (read + filter)
> DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?
>
> Also, im trying to bring the table in memory now to perform the
> computation from there and compare performance.  Code below.  But I'm
> getting an error (beneath the code) even though I think ive constructed the
> MemTable correctly (from [1]).  From what I see all the types are the same
> as when I used the original df from read_csv so I'm not sure what I'm doing
> wrong.
>
> I also saw there was an open issue [2] for this error type raised on
> rust-lang - so im unsure if its my implementation, datafusion/arrow issue,
> or Rust issue.
>
> Thanks again for help!
>
> ```
>     let sch = Arc::new(df.schema());
>     let batches = vec![df.collect().await?];
>     let provider = MemTable::new(sch, batches)?;
>
>     ctx.register_table("memory_table", Box::new(provider));
>
>     let mem_df = ctx.table("memory_table")?;
>
>     let q_start = Instant::now();
>     let results = mem_df
>         .filter(col("userId").eq(lit(1)))?
>         .collect()
>         .await
>         .unwrap();
> ```
>
> Which is returning this error:
>
> error[E0698]: type inside `async` block must be known in this context
>   --> src\main.rs:37:38
>    |
> 37 |         .filter(col("userId").eq(lit(1)))?
>    |                                      ^ cannot infer type for type
> `{integer}`
>    |
> note: the type is part of the `async` block because of this `await`
>   --> src\main.rs:36:19
>    |
> 36 |       let results = mem_df
>    |  ___________________^
> 37 | |         .filter(col("userId").eq(lit(1)))?
> 38 | |         .collect()
> 39 | |         .await
>    | |______________^
>
>
> [1]
> https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
> [2] https://github.com/rust-lang/rust/issues/63502
>
> Matthew M. Turner
> Email: matthew.m.turner@outlook.com
> Phone: (908)-868-2786
>
> -----Original Message-----
> From: Michael Mior <mm...@apache.org>
> Sent: Thursday, December 10, 2020 8:55 PM
> To: user@arrow.apache.org
> Subject: Re: [Rust] DataFusion performance
>
> Contrary to what you might expect given the name, read_csv does not
> actually read the CSV file. It instead creates the start of a logical
> execution plan which involves reading the CSV file when that plan is
> finally executed. This happens when you call collect().
>
> Pandas read_csv on the other hand immediately reads the CSV file. So
> you're comparing the time of reading AND filtering the file
> (DataFusion) with the time to filter data which has already been read
> (Pandas).
>
> There's nothing wrong with your use of DataFusion per se, you simply
> weren't measuring what you thought you were measuring.
> --
> Michael Mior
> mmior@apache.org
>
> Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com>
> a écrit :
> >
> > Hello,
> >
> >
> >
> > I’ve been playing around with DataFusion to explore the feasibility of
> replacing current python/pandas data processing jobs with Rust/datafusion.
> Ultimately, looking to improve performance / decrease cost.
> >
> >
> >
> > I was doing some simple tests to start to measure performance
> differences on a simple task (read a csv[1] and filter it).
> >
> >
> >
> > Reading the csv datafusion seemed to outperform pandas by around 30%
> which was nice.
> >
> > *Rust took around 20-25ms to read the csv (compared to 32ms from
> > pandas)
> >
> >
> >
> > However, when filtering the data I was surprised to see that pandas was
> way faster.
> >
> > *Rust took around 500-600ms to filter the csv(compared to 1ms from
> > pandas)
> >
> >
> >
> > My code for each is below.  I know I should be running the DataFusion
> times through something similar to pythons %timeit but I didn’t have that
> immediately accessible and I ran many times to confirm it was roughly
> consistent.
> >
> >
> >
> > Is this performance expected? Or am I using datafusion incorrectly?
> >
> >
> >
> > Any insight is much appreciated!
> >
> >
> >
> > [Rust]
> >
> > ```
> >
> > use datafusion::error::Result;
> >
> > use datafusion::prelude::*;
> >
> > use std::time::Instant;
> >
> >
> >
> > #[tokio::main]
> >
> > async fn main() -> Result<()> {
> >
> >     let start = Instant::now();
> >
> >
> >
> >     let mut ctx = ExecutionContext::new();
> >
> >
> >
> >     let ratings_csv = "ratings_small.csv";
> >
> >
> >
> >     let df = ctx.read_csv(ratings_csv,
> > CsvReadOptions::new()).unwrap();
> >
> >     println!("Read CSV Duration: {:?}", start.elapsed());
> >
> >
> >
> >     let q_start = Instant::now();
> >
> >     let results = df
> >
> >         .filter(col("userId").eq(lit(1)))?
> >
> >         .collect()
> >
> >         .await
> >
> >         .unwrap();
> >
> >     println!("Filter duration: {:?}", q_start.elapsed());
> >
> >
> >
> >     println!("Duration: {:?}", start.elapsed());
> >
> >
> >
> >     Ok(())
> >
> > }
> >
> > ```
> >
> >
> >
> > [Python]
> >
> > ```
> >
> > In [1]: df = pd.read_csv(“ratings_small.csv”)
> >
> > 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
> >
> >
> >
> > In [2]: df.query(“userId==1”)
> >
> > 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> > each)
> >
> > ```
> >
> >
> >
> > [1]:
> > https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> > sv
> >
> >
> >
> >
> >
> > Matthew M. Turner
> >
> > Email: matthew.m.turner@outlook.com
> >
> > Phone: (908)-868-2786
> >
> >
>

RE: [Rust] DataFusion performance

Posted by Matthew Turner <ma...@outlook.com>.
Played around some more - it was because I wasn’t using --release flag.  Sry about that, still learning rust.

Using that flag, the total time to read and filter is between 52 and 80ms.

In general, what should I expect when comparing the performance of pandas to datafusion?

@Andy Grove thanks for adding that.  If there is a need for additional datafusion benchmarks and what I do could help with that then I would be happy to contribute it.  I will send a follow up once ive made progress.

I'm also still having trouble with that memory table, so any help there is appreciated.

Thanks for your time!  Very excited by this.

Matthew M. Turner
Email: matthew.m.turner@outlook.com
Phone: (908)-868-2786

-----Original Message-----
From: Matthew Turner <ma...@outlook.com> 
Sent: Friday, December 11, 2020 12:24 AM
To: user@arrow.apache.org
Subject: RE: [Rust] DataFusion performance

Thanks for context! Makes sense.

Even with that, when comparing the total time of each (read + filter) DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?

Also, im trying to bring the table in memory now to perform the computation from there and compare performance.  Code below.  But I'm getting an error (beneath the code) even though I think ive constructed the MemTable correctly (from [1]).  From what I see all the types are the same as when I used the original df from read_csv so I'm not sure what I'm doing wrong.

I also saw there was an open issue [2] for this error type raised on rust-lang - so im unsure if its my implementation, datafusion/arrow issue, or Rust issue.

Thanks again for help!

```
    let sch = Arc::new(df.schema());
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(sch, batches)?;

    ctx.register_table("memory_table", Box::new(provider));

    let mem_df = ctx.table("memory_table")?;

    let q_start = Instant::now();
    let results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
```

Which is returning this error:

error[E0698]: type inside `async` block must be known in this context
  --> src\main.rs:37:38
   |
37 |         .filter(col("userId").eq(lit(1)))?
   |                                      ^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async` block because of this `await`
  --> src\main.rs:36:19
   |
36 |       let results = mem_df
   |  ___________________^
37 | |         .filter(col("userId").eq(lit(1)))?
38 | |         .collect()
39 | |         .await
   | |______________^


[1] https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502

Matthew M. Turner
Email: matthew.m.turner@outlook.com
Phone: (908)-868-2786

-----Original Message-----
From: Michael Mior <mm...@apache.org>
Sent: Thursday, December 10, 2020 8:55 PM
To: user@arrow.apache.org
Subject: Re: [Rust] DataFusion performance

Contrary to what you might expect given the name, read_csv does not actually read the CSV file. It instead creates the start of a logical execution plan which involves reading the CSV file when that plan is finally executed. This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So you're comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pandas).

There's nothing wrong with your use of DataFusion per se, you simply weren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org

Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com> a écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from
> pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn’t have that immediately accessible and I ran many times to confirm it was roughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv, 
> CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops
> each)
>
> ```
>
>
>
> [1]: 
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>

RE: [Rust] DataFusion performance

Posted by Matthew Turner <ma...@outlook.com>.
Thanks for context! Makes sense.

Even with that, when comparing the total time of each (read + filter) DataFusion still appears much slower(~625ms vs 33ms).  Is that expected?

Also, im trying to bring the table in memory now to perform the computation from there and compare performance.  Code below.  But I'm getting an error (beneath the code) even though I think ive constructed the MemTable correctly (from [1]).  From what I see all the types are the same as when I used the original df from read_csv so I'm not sure what I'm doing wrong.

I also saw there was an open issue [2] for this error type raised on rust-lang - so im unsure if its my implementation, datafusion/arrow issue, or Rust issue.

Thanks again for help!

```
    let sch = Arc::new(df.schema());
    let batches = vec![df.collect().await?];
    let provider = MemTable::new(sch, batches)?;

    ctx.register_table("memory_table", Box::new(provider));

    let mem_df = ctx.table("memory_table")?;

    let q_start = Instant::now();
    let results = mem_df
        .filter(col("userId").eq(lit(1)))?
        .collect()
        .await
        .unwrap();
```

Which is returning this error:

error[E0698]: type inside `async` block must be known in this context
  --> src\main.rs:37:38
   |
37 |         .filter(col("userId").eq(lit(1)))?
   |                                      ^ cannot infer type for type `{integer}`
   |
note: the type is part of the `async` block because of this `await`
  --> src\main.rs:36:19
   |
36 |       let results = mem_df
   |  ___________________^
37 | |         .filter(col("userId").eq(lit(1)))?
38 | |         .collect()
39 | |         .await
   | |______________^


[1] https://github.com/apache/arrow/blob/master/rust/datafusion/examples/dataframe_in_memory.rs
[2] https://github.com/rust-lang/rust/issues/63502

Matthew M. Turner
Email: matthew.m.turner@outlook.com
Phone: (908)-868-2786

-----Original Message-----
From: Michael Mior <mm...@apache.org> 
Sent: Thursday, December 10, 2020 8:55 PM
To: user@arrow.apache.org
Subject: Re: [Rust] DataFusion performance

Contrary to what you might expect given the name, read_csv does not actually read the CSV file. It instead creates the start of a logical execution plan which involves reading the CSV file when that plan is finally executed. This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So you're comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read (Pandas).

There's nothing wrong with your use of DataFusion per se, you simply weren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org

Le jeu. 10 déc. 2020 à 17:11, Matthew Turner <ma...@outlook.com> a écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from 
> pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from 
> pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn’t have that immediately accessible and I ran many times to confirm it was roughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv, 
> CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops 
> each)
>
> ```
>
>
>
> [1]: 
> https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.c
> sv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>

Re: [Rust] DataFusion performance

Posted by Michael Mior <mm...@apache.org>.
Contrary to what you might expect given the name, read_csv does not
actually read the CSV file. It instead creates the start of a logical
execution plan which involves reading the CSV file when that plan is
finally executed. This happens when you call collect().

Pandas read_csv on the other hand immediately reads the CSV file. So
you're comparing the time of reading AND filtering the file
(DataFusion) with the time to filter data which has already been read
(Pandas).

There's nothing wrong with your use of DataFusion per se, you simply
weren't measuring what you thought you were measuring.
--
Michael Mior
mmior@apache.org

Le jeu. 10 déc. 2020 à 17:11, Matthew Turner
<ma...@outlook.com> a écrit :
>
> Hello,
>
>
>
> I’ve been playing around with DataFusion to explore the feasibility of replacing current python/pandas data processing jobs with Rust/datafusion.  Ultimately, looking to improve performance / decrease cost.
>
>
>
> I was doing some simple tests to start to measure performance differences on a simple task (read a csv[1] and filter it).
>
>
>
> Reading the csv datafusion seemed to outperform pandas by around 30% which was nice.
>
> *Rust took around 20-25ms to read the csv (compared to 32ms from pandas)
>
>
>
> However, when filtering the data I was surprised to see that pandas was way faster.
>
> *Rust took around 500-600ms to filter the csv(compared to 1ms from pandas)
>
>
>
> My code for each is below.  I know I should be running the DataFusion times through something similar to pythons %timeit but I didn’t have that immediately accessible and I ran many times to confirm it was roughly consistent.
>
>
>
> Is this performance expected? Or am I using datafusion incorrectly?
>
>
>
> Any insight is much appreciated!
>
>
>
> [Rust]
>
> ```
>
> use datafusion::error::Result;
>
> use datafusion::prelude::*;
>
> use std::time::Instant;
>
>
>
> #[tokio::main]
>
> async fn main() -> Result<()> {
>
>     let start = Instant::now();
>
>
>
>     let mut ctx = ExecutionContext::new();
>
>
>
>     let ratings_csv = "ratings_small.csv";
>
>
>
>     let df = ctx.read_csv(ratings_csv, CsvReadOptions::new()).unwrap();
>
>     println!("Read CSV Duration: {:?}", start.elapsed());
>
>
>
>     let q_start = Instant::now();
>
>     let results = df
>
>         .filter(col("userId").eq(lit(1)))?
>
>         .collect()
>
>         .await
>
>         .unwrap();
>
>     println!("Filter duration: {:?}", q_start.elapsed());
>
>
>
>     println!("Duration: {:?}", start.elapsed());
>
>
>
>     Ok(())
>
> }
>
> ```
>
>
>
> [Python]
>
> ```
>
> In [1]: df = pd.read_csv(“ratings_small.csv”)
>
> 32.4 ms ± 210 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
>
>
>
> In [2]: df.query(“userId==1”)
>
> 1.16 ms ± 24.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
>
> ```
>
>
>
> [1]: https://www.kaggle.com/rounakbanik/the-movies-dataset?select=ratings.csv
>
>
>
>
>
> Matthew M. Turner
>
> Email: matthew.m.turner@outlook.com
>
> Phone: (908)-868-2786
>
>