You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "ahmedriza (via GitHub)" <gi...@apache.org> on 2023/02/10 23:18:38 UTC

[GitHub] [arrow-ballista] ahmedriza commented on issue #6: [Ballista] Support to access remote object store, like HDFS, S3, etc

ahmedriza commented on issue #6:
URL: https://github.com/apache/arrow-ballista/issues/6#issuecomment-1426482526

   @saikrishna1-bidgely, here's an example of what I tried and this worked.  I'm not 100% sure that this is how it is supposed to be :-)
   
   - Build the `scheduler` with the `s3` feature enabled, i.e. 
   ```
   ballista-core = { path = "../core", version = "0.10.0" , features = ["s3"] }
   ```
   - Define the following environment variables (I tested with a MinIO instance):
   ```
   AWS_DEFAULT_REGION
   AWS_ACCESS_KEY_ID
   AWS_SECRET_ACCESS_KEY
   AWS_ENDPOINT
   ```
   You may need to define additional `AWS` environment variables depending on your S3 service.
   - Start `scheduler` and `executor`
   
   I tested with the following sample code:
   ```
   use ballista::prelude::BallistaContext;
   use ballista_core::config::BallistaConfig;
   use datafusion::prelude::ParquetReadOptions;
   
   #[tokio::main]
   pub async fn main()  {
       let config = BallistaConfig::builder().build().unwrap();
       let ctx = BallistaContext::remote("localhost", 50050, &config).await.unwrap();
       let filename = "s3://foo/test.parquet";
       let df = ctx
           .read_parquet(filename, ParquetReadOptions::default())
           .await?;
       let rows = df.count().await?;
       println!("rows: {}", rows);
   }
   ```
   The code correctly returns the number of rows in the Parquet file:
   ```
   rows: 15309
   ```
   
   Last bit of logging from the `scheduler` process:
   ```
   2023-02-10T23:16:11.345868Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::display: === [pRdAqhp/2] Stage finished, physical plan with metrics ===
   ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, repart_time=1ns, write_time=721.748µs]
     AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
       CoalescePartitionsExec, metrics=[]
         ShuffleReaderExec: partitions=1, metrics=[]
   
   
   2023-02-10T23:16:11.346163Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::state::execution_graph: Job pRdAqhp is success, finalizing output partitions
   2023-02-10T23:16:11.346354Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::scheduler_server::query_stage_scheduler: Job pRdAqhp success
   ```
   
   From the `executor` process:
   ```
   2023-02-10T23:16:11.241689Z  INFO          task_runner ThreadId(22) ballista_executor::metrics: === [pRdAqhp/2/0] Physical plan with metrics ===
   ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, write_time=721.748µs, repart_time=1ns]
     AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
       CoalescePartitionsExec, metrics=[]
         ShuffleReaderExec: partitions=1, metrics=[]
   ```
   
   
   
   Hope this helps.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org