You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/11/18 21:43:31 UTC

[GitHub] [arrow] alamb commented on a change in pull request #8705: ARROW-10464: [Rust] [DataFusion] Add utility to convert TPC-H data from tbl to CSV and Parquet

alamb commented on a change in pull request #8705:
URL: https://github.com/apache/arrow/pull/8705#discussion_r526432425



##########
File path: rust/benchmarks/README.md
##########
@@ -49,45 +49,16 @@ data. This value can be increased to generate larger data sets.
 The benchmark can then be run (assuming the data created from `dbgen` is in `/mnt/tpch-dbgen`) with a command such as:
 
 ```bash
-cargo run --release --bin tpch -- --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark --iterations 3 --path /mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
 ```
 
-The benchmark program also supports CSV and Parquet input file formats.
-
-This crate does not currently provide a method for converting the generated tbl format to CSV or Parquet so it is 
-necessary to use other tools to perform this conversion.
-
-One option is to use the following Docker image to perform the conversion from `tbl` files to CSV or Parquet.
-
-```bash
-docker run -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT 
-  -h, --help   Show help message
-
-Subcommand: convert-tpch
-  -i, --input  <arg>
-      --input-format  <arg>
-  -o, --output  <arg>
-      --output-format  <arg>
-  -p, --partitions  <arg>
-  -h, --help                   Show help message
-```
-
-Note that it is necessary to mount volumes into the Docker container as appropriate so that the file conversion process
-can access files on the host system.
-
-Here is a full example that assumes that data is stored in the `/mnt` path on the host system.
+The benchmark program also supports CSV and Parquet input file formats and a utility is provided to convert from `tbl`
+to CSV and Parquet.

Review comment:
       ```suggestion
   (generated by the `dbgen` utility) to CSV and Parquet.
   ```

##########
File path: rust/benchmarks/src/bin/tpch.rs
##########
@@ -180,6 +207,33 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu
     Ok(())
 }
 
+async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
+    let schema = lineitem_schema();
+
+    let path = format!("{}/lineitem.tbl", opt.input_path.to_str().unwrap());
+    let options = CsvReadOptions::new()
+        .schema(&schema)
+        .delimiter(b'|')
+        .file_extension(".tbl");
+
+    let ctx = ExecutionContext::new();
+    let csv = Arc::new(CsvExec::try_new(&path, options, None, 4096)?);
+    let output_path = opt.output_path.to_str().unwrap().to_owned();
+
+    match opt.file_format.as_str() {
+        "csv" => ctx.write_csv(csv, output_path).await?,
+        "parquet" => ctx.write_parquet(csv, output_path).await?,

Review comment:
       this is pretty cool

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -390,6 +391,37 @@ impl ExecutionContext {
         Ok(())
     }
 
+    /// Execute a query and write the results to a partitioned Parquet file
+    pub async fn write_parquet(

Review comment:
       eventually it would be cool to write this out with some pattern that is more controllable by the user

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -390,6 +391,37 @@ impl ExecutionContext {
         Ok(())
     }
 
+    /// Execute a query and write the results to a partitioned Parquet file
+    pub async fn write_parquet(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        path: String,
+    ) -> Result<()> {
+        // create directory to contain the CSV files (one per partition)
+        let path = path.to_owned();
+        fs::create_dir(&path)?;
+
+        for i in 0..plan.output_partitioning().partition_count() {
+            let path = path.clone();
+            let plan = plan.clone();
+            let filename = format!("part-{}.parquet", i);
+            let path = Path::new(&path).join(&filename);
+            let file = fs::File::create(path)?;
+            let mut writer =
+                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(), None)?;
+            let stream = plan.execute(i).await?;
+
+            stream
+                .map(|batch| writer.write(&batch?))
+                .try_collect()
+                .await
+                .map_err(|e| DataFusionError::from(e))?;
+
+            writer.close()?;
+        }
+        Ok(())

Review comment:
       I suggest a test for this function (just for the basic functionality) that it is wired up correctly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org