You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/24 19:20:40 UTC

[arrow-rs] branch master updated: add an integration with pytest against pyspark (#3176)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 007fb4c56 add an integration with pytest against pyspark (#3176)
007fb4c56 is described below

commit 007fb4c56ffce7f8b6be7928196da79ee5eff75a
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Fri Nov 25 03:20:35 2022 +0800

    add an integration with pytest against pyspark (#3176)
---
 .github/workflows/parquet.yml                |  34 ++++++++-
 .gitignore                                   |   3 +
 parquet/README.md                            |   1 -
 parquet/pytest/pyspark_integration_test.py   |  65 +++++++++++++++++
 parquet/pytest/requirements.in               |  20 ++++++
 parquet/pytest/requirements.txt              | 102 +++++++++++++++++++++++++++
 parquet/src/bin/parquet-show-bloom-filter.rs |  19 ++++-
 7 files changed, 238 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 5b0cc8744..c5c7aac05 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -19,7 +19,6 @@
 # tests for parquet crate
 name: "parquet"
 
-
 # trigger for all PRs that touch certain files and changes to master
 on:
   push:
@@ -58,7 +57,6 @@ jobs:
       - name: Test --all-features
         run: cargo test -p parquet --all-features
 
-
   # test compilation
   linux-features:
     name: Check Compilation
@@ -120,6 +118,38 @@ jobs:
       - name: Build wasm32-wasi
         run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi
 
+  pyspark-integration-test:
+    name: PySpark Integration Test
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        rust: [stable]
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup Python
+        uses: actions/setup-python@v4
+        with:
+          python-version: "3.10"
+          cache: "pip"
+      - name: Install Python dependencies
+        run: |
+          cd parquet/pytest
+          pip install -r requirements.txt
+      - name: Black check the test files
+        run: |
+          cd parquet/pytest
+          black --check *.py --verbose
+      - name: Setup Rust toolchain
+        run: |
+          rustup toolchain install ${{ matrix.rust }}
+          rustup default ${{ matrix.rust }}
+      - name: Install binary for checking
+        run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli
+      - name: Run pytest
+        run: |
+          cd parquet/pytest
+          pytest -v
+
   clippy:
     name: Clippy
     runs-on: ubuntu-latest
diff --git a/.gitignore b/.gitignore
index b8506ea06..52ad19cb0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,3 +92,6 @@ $RECYCLE.BIN/
 # Windows shortcuts
 *.lnk
 
+# Python virtual env in parquet crate
+parquet/pytest/venv/
+__pycache__/
diff --git a/parquet/README.md b/parquet/README.md
index c9245b082..d904fc64e 100644
--- a/parquet/README.md
+++ b/parquet/README.md
@@ -41,7 +41,6 @@ However, for historical reasons, this crate uses versions with major numbers gre
 The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:
 
 - `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
-- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
 - `async` - support `async` APIs for reading parquet
 - `json` - support for reading / writing `json` data to / from parquet
 - `brotli` (default) - support for parquet using `brotli` compression
diff --git a/parquet/pytest/pyspark_integration_test.py b/parquet/pytest/pyspark_integration_test.py
new file mode 100755
index 000000000..0a0b881e3
--- /dev/null
+++ b/parquet/pytest/pyspark_integration_test.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyspark.sql
+import tempfile
+import subprocess
+import pathlib
+
+
+def create_data_and_df():
+    spark = pyspark.sql.SparkSession.builder.getOrCreate()
+    spark.conf.set("parquet.bloom.filter.enabled", True)
+    spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
+    spark.conf.set("parquet.bloom.filter.max.bytes", 32)
+    data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(100)]
+    df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
+    return data, df
+
+
+def get_expected_output(data):
+    expected = ["Row group #0", "=" * 80]
+    for v in data:
+        expected.append(f"Value {v[0]} is present in bloom filter")
+    for v in data:
+        expected.append(f"Value {v[1]} is absent in bloom filter")
+    expected = "\n".join(expected) + "\n"
+    return expected.encode("utf-8")
+
+
+def get_cli_output(output_dir, data, col_name="id"):
+    # take the first (and only) parquet file
+    parquet_file = sorted(pathlib.Path(output_dir).glob("*.parquet"))[0]
+    args = [
+        "parquet-show-bloom-filter",
+        "--file-name",
+        parquet_file,
+        "--column",
+        col_name,
+    ]
+    for v in data:
+        args.extend(["--values", v[0]])
+    for v in data:
+        args.extend(["--values", v[1]])
+    return subprocess.check_output(args)
+
+
+def test_pyspark_bloom_filter():
+    data, df = create_data_and_df()
+    with tempfile.TemporaryDirectory() as output_dir:
+        df.write.parquet(output_dir, mode="overwrite")
+        cli_output = get_cli_output(output_dir, data)
+        assert cli_output == get_expected_output(data)
diff --git a/parquet/pytest/requirements.in b/parquet/pytest/requirements.in
new file mode 100644
index 000000000..a0b30b867
--- /dev/null
+++ b/parquet/pytest/requirements.in
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+pytest
+pyspark
+black
+
diff --git a/parquet/pytest/requirements.txt b/parquet/pytest/requirements.txt
new file mode 100644
index 000000000..fb6f8fb6d
--- /dev/null
+++ b/parquet/pytest/requirements.txt
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This file is autogenerated by pip-compile with python 3.10
+# To update, run:
+#
+#    pip-compile --generate-hashes --resolver=backtracking
+#
+attrs==22.1.0 \
+    --hash=sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6 \
+    --hash=sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c
+    # via pytest
+black==22.10.0 \
+    --hash=sha256:14ff67aec0a47c424bc99b71005202045dc09270da44a27848d534600ac64fc7 \
+    --hash=sha256:197df8509263b0b8614e1df1756b1dd41be6738eed2ba9e9769f3880c2b9d7b6 \
+    --hash=sha256:1e464456d24e23d11fced2bc8c47ef66d471f845c7b7a42f3bd77bf3d1789650 \
+    --hash=sha256:2039230db3c6c639bd84efe3292ec7b06e9214a2992cd9beb293d639c6402edb \
+    --hash=sha256:21199526696b8f09c3997e2b4db8d0b108d801a348414264d2eb8eb2532e540d \
+    --hash=sha256:2644b5d63633702bc2c5f3754b1b475378fbbfb481f62319388235d0cd104c2d \
+    --hash=sha256:432247333090c8c5366e69627ccb363bc58514ae3e63f7fc75c54b1ea80fa7de \
+    --hash=sha256:444ebfb4e441254e87bad00c661fe32df9969b2bf224373a448d8aca2132b395 \
+    --hash=sha256:5b9b29da4f564ba8787c119f37d174f2b69cdfdf9015b7d8c5c16121ddc054ae \
+    --hash=sha256:5cc42ca67989e9c3cf859e84c2bf014f6633db63d1cbdf8fdb666dcd9e77e3fa \
+    --hash=sha256:5d8f74030e67087b219b032aa33a919fae8806d49c867846bfacde57f43972ef \
+    --hash=sha256:72ef3925f30e12a184889aac03d77d031056860ccae8a1e519f6cbb742736383 \
+    --hash=sha256:819dc789f4498ecc91438a7de64427c73b45035e2e3680c92e18795a839ebb66 \
+    --hash=sha256:915ace4ff03fdfff953962fa672d44be269deb2eaf88499a0f8805221bc68c87 \
+    --hash=sha256:9311e99228ae10023300ecac05be5a296f60d2fd10fff31cf5c1fa4ca4b1988d \
+    --hash=sha256:974308c58d057a651d182208a484ce80a26dac0caef2895836a92dd6ebd725e0 \
+    --hash=sha256:b8b49776299fece66bffaafe357d929ca9451450f5466e997a7285ab0fe28e3b \
+    --hash=sha256:c957b2b4ea88587b46cf49d1dc17681c1e672864fd7af32fc1e9664d572b3458 \
+    --hash=sha256:e41a86c6c650bcecc6633ee3180d80a025db041a8e2398dcc059b3afa8382cd4 \
+    --hash=sha256:f513588da599943e0cde4e32cc9879e825d58720d6557062d1098c5ad80080e1 \
+    --hash=sha256:fba8a281e570adafb79f7755ac8721b6cf1bbf691186a287e990c7929c7692ff
+    # via -r requirements.in
+click==8.1.3 \
+    --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \
+    --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48
+    # via black
+exceptiongroup==1.0.4 \
+    --hash=sha256:542adf9dea4055530d6e1279602fa5cb11dab2395fa650b8674eaec35fc4a828 \
+    --hash=sha256:bd14967b79cd9bdb54d97323216f8fdf533e278df937aa2a90089e7d6e06e5ec
+    # via pytest
+iniconfig==1.1.1 \
+    --hash=sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3 \
+    --hash=sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32
+    # via pytest
+mypy-extensions==0.4.3 \
+    --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
+    --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
+    # via black
+packaging==21.3 \
+    --hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \
+    --hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522
+    # via pytest
+pathspec==0.10.2 \
+    --hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \
+    --hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0
+    # via black
+platformdirs==2.5.4 \
+    --hash=sha256:1006647646d80f16130f052404c6b901e80ee4ed6bef6792e1f238a8969106f7 \
+    --hash=sha256:af0276409f9a02373d540bf8480021a048711d572745aef4b7842dad245eba10
+    # via black
+pluggy==1.0.0 \
+    --hash=sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159 \
+    --hash=sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3
+    # via pytest
+py4j==0.10.9.5 \
+    --hash=sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6 \
+    --hash=sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04
+    # via pyspark
+pyparsing==3.0.9 \
+    --hash=sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb \
+    --hash=sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc
+    # via packaging
+pyspark==3.3.1 \
+    --hash=sha256:e99fa7de92be406884bfd831c32b9306a3a99de44cfc39a2eefb6ed07445d5fa
+    # via -r requirements.in
+pytest==7.2.0 \
+    --hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \
+    --hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59
+    # via -r requirements.in
+tomli==2.0.1 \
+    --hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
+    --hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
+    # via
+    #   black
+    #   pytest
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
index 55ecb2abf..f9462327f 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -34,7 +34,11 @@
 //! ```
 
 use clap::Parser;
-use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::file::{
+    properties::ReaderProperties,
+    reader::{FileReader, SerializedFileReader},
+    serialized_reader::ReadOptionsBuilder,
+};
 use std::{fs::File, path::Path};
 
 #[derive(Debug, Parser)]
@@ -63,8 +67,17 @@ fn main() {
     let path = Path::new(&file_name);
     let file = File::open(path).expect("Unable to open file");
 
-    let file_reader =
-        SerializedFileReader::new(file).expect("Unable to open file as Parquet");
+    let file_reader = SerializedFileReader::new_with_options(
+        file,
+        ReadOptionsBuilder::new()
+            .with_reader_properties(
+                ReaderProperties::builder()
+                    .set_read_bloom_filter(true)
+                    .build(),
+            )
+            .build(),
+    )
+    .expect("Unable to open file as Parquet");
     let metadata = file_reader.metadata();
     for (ri, row_group) in metadata.row_groups().iter().enumerate() {
         println!("Row group #{}", ri);