You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2024/02/26 18:06:17 UTC

(arrow-datafusion) branch main updated: tests: add tests for writing hive-partitioned parquet (#9316)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a26f583d27 tests: add tests for writing hive-partitioned parquet (#9316)
a26f583d27 is described below

commit a26f583d2766da746ff30199cc7341227526737f
Author: Trent Hauck <tr...@trenthauck.com>
AuthorDate: Mon Feb 26 10:06:11 2024 -0800

    tests: add tests for writing hive-partitioned parquet (#9316)
    
    * tests: adds tests associated with #9237
    
    * style: clippy
---
 .../src/datasource/physical_plan/parquet/mod.rs    |  74 ----------
 datafusion/core/tests/dataframe/mod.rs             | 160 ++++++++++++++++++++-
 2 files changed, 158 insertions(+), 76 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index badd870848..3aa1998bde 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -2066,80 +2066,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn write_parquet_results() -> Result<()> {
-        // create partitioned input file and context
-        let tmp_dir = TempDir::new()?;
-        // let mut ctx = create_ctx(&tmp_dir, 4).await?;
-        let ctx = SessionContext::new_with_config(
-            SessionConfig::new().with_target_partitions(8),
-        );
-        let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
-        // register csv file with the execution context
-        ctx.register_csv(
-            "test",
-            tmp_dir.path().to_str().unwrap(),
-            CsvReadOptions::new().schema(&schema),
-        )
-        .await?;
-
-        // register a local file system object store for /tmp directory
-        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
-        let local_url = Url::parse("file://local").unwrap();
-        ctx.runtime_env().register_object_store(&local_url, local);
-
-        // execute a simple query and write the results to parquet
-        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
-        let out_dir_url = "file://local/out/";
-        let df = ctx.sql("SELECT c1, c2 FROM test").await?;
-        df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
-            .await?;
-        // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?;
-
-        // create a new context and verify that the results were saved to a partitioned parquet file
-        let ctx = SessionContext::new();
-
-        // get write_id
-        let mut paths = fs::read_dir(&out_dir).unwrap();
-        let path = paths.next();
-        let name = path
-            .unwrap()?
-            .path()
-            .file_name()
-            .expect("Should be a file name")
-            .to_str()
-            .expect("Should be a str")
-            .to_owned();
-        let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
-        let write_id = parsed_id.to_owned();
-
-        // register each partition as well as the top level dir
-        ctx.register_parquet(
-            "part0",
-            &format!("{out_dir}/{write_id}_0.parquet"),
-            ParquetReadOptions::default(),
-        )
-        .await?;
-
-        ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
-            .await?;
-
-        let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
-        let allparts = ctx
-            .sql("SELECT c1, c2 FROM allparts")
-            .await?
-            .collect()
-            .await?;
-
-        let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
-
-        assert_eq!(part0[0].schema(), allparts[0].schema());
-
-        assert_eq!(allparts_count, 40);
-
-        Ok(())
-    }
-
     fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
         let df_schema = schema.clone().to_dfschema().unwrap();
         let execution_props = ExecutionProps::new();
diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs
index b08b2b8fc7..ee84200417 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -30,15 +30,19 @@ use arrow::{
 };
 use arrow_array::Float32Array;
 use arrow_schema::ArrowError;
+use object_store::local::LocalFileSystem;
+use std::fs;
 use std::sync::Arc;
+use tempfile::TempDir;
+use url::Url;
 
-use datafusion::dataframe::DataFrame;
+use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
 use datafusion::datasource::MemTable;
 use datafusion::error::Result;
 use datafusion::execution::context::{SessionContext, SessionState};
 use datafusion::prelude::JoinType;
 use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
-use datafusion::test_util::parquet_test_data;
+use datafusion::test_util::{parquet_test_data, populate_csv_partitions};
 use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
 use datafusion_common::{assert_contains, DataFusionError, ScalarValue, UnnestOptions};
 use datafusion_execution::config::SessionConfig;
@@ -1896,3 +1900,155 @@ async fn test_dataframe_placeholder_missing_param_values() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn write_partitioned_parquet_results() -> Result<()> {
+    // create partitioned input file and context
+    let tmp_dir = TempDir::new()?;
+
+    let ctx = SessionContext::new();
+
+    // Create an in memory table with schema C1 and C2, both strings
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("c1", DataType::Utf8, false),
+        Field::new("c2", DataType::Utf8, false),
+    ]));
+
+    let record_batch = RecordBatch::try_new(
+        schema.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["abc", "def"])),
+            Arc::new(StringArray::from(vec!["123", "456"])),
+        ],
+    )?;
+
+    let mem_table = Arc::new(MemTable::try_new(schema, vec![vec![record_batch]])?);
+
+    // Register the table in the context
+    ctx.register_table("test", mem_table)?;
+
+    let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+    let local_url = Url::parse("file://local").unwrap();
+    ctx.runtime_env().register_object_store(&local_url, local);
+
+    // execute a simple query and write the results to parquet
+    let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
+    let out_dir_url = format!("file://{out_dir}");
+
+    // Write the results to parquet with partitioning
+    let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+    let df_write_options =
+        DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
+
+    df.write_parquet(&out_dir_url, df_write_options, None)
+        .await?;
+
+    // Explicitly read the parquet file at c2=123 to verify the physical files are partitioned
+    let partitioned_file = format!("{out_dir}/c2=123", out_dir = out_dir);
+    let filted_df = ctx
+        .read_parquet(&partitioned_file, ParquetReadOptions::default())
+        .await?;
+
+    // Check that the c2 column is gone and that c1 is abc.
+    let results = filted_df.collect().await?;
+    let expected = ["+-----+", "| c1  |", "+-----+", "| abc |", "+-----+"];
+
+    assert_batches_eq!(expected, &results);
+
+    // Read the entire set of parquet files
+    let df = ctx
+        .read_parquet(
+            &out_dir_url,
+            ParquetReadOptions::default()
+                .table_partition_cols(vec![(String::from("c2"), DataType::Utf8)]),
+        )
+        .await?;
+
+    // Check that the df has the entire set of data
+    let results = df.collect().await?;
+    let expected = [
+        "+-----+-----+",
+        "| c1  | c2  |",
+        "+-----+-----+",
+        "| abc | 123 |",
+        "| def | 456 |",
+        "+-----+-----+",
+    ];
+
+    assert_batches_eq!(expected, &results);
+
+    Ok(())
+}
+
+#[tokio::test]
+async fn write_parquet_results() -> Result<()> {
+    // create partitioned input file and context
+    let tmp_dir = TempDir::new()?;
+    // let mut ctx = create_ctx(&tmp_dir, 4).await?;
+    let ctx =
+        SessionContext::new_with_config(SessionConfig::new().with_target_partitions(8));
+    let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
+    // register csv file with the execution context
+    ctx.register_csv(
+        "test",
+        tmp_dir.path().to_str().unwrap(),
+        CsvReadOptions::new().schema(&schema),
+    )
+    .await?;
+
+    // register a local file system object store for /tmp directory
+    let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+    let local_url = Url::parse("file://local").unwrap();
+    ctx.runtime_env().register_object_store(&local_url, local);
+
+    // execute a simple query and write the results to parquet
+    let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
+    let out_dir_url = "file://local/out/";
+    let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+    df.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
+        .await?;
+    // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir, None).await?;
+
+    // create a new context and verify that the results were saved to a partitioned parquet file
+    let ctx = SessionContext::new();
+
+    // get write_id
+    let mut paths = fs::read_dir(&out_dir).unwrap();
+    let path = paths.next();
+    let name = path
+        .unwrap()?
+        .path()
+        .file_name()
+        .expect("Should be a file name")
+        .to_str()
+        .expect("Should be a str")
+        .to_owned();
+    let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
+    let write_id = parsed_id.to_owned();
+
+    // register each partition as well as the top level dir
+    ctx.register_parquet(
+        "part0",
+        &format!("{out_dir}/{write_id}_0.parquet"),
+        ParquetReadOptions::default(),
+    )
+    .await?;
+
+    ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
+        .await?;
+
+    let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
+    let allparts = ctx
+        .sql("SELECT c1, c2 FROM allparts")
+        .await?
+        .collect()
+        .await?;
+
+    let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
+
+    assert_eq!(part0[0].schema(), allparts[0].schema());
+
+    assert_eq!(allparts_count, 40);
+
+    Ok(())
+}