You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/24 19:20:40 UTC
[arrow-rs] branch master updated: add an integration with pytest against pyspark (#3176)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 007fb4c56 add an integration with pytest against pyspark (#3176)
007fb4c56 is described below
commit 007fb4c56ffce7f8b6be7928196da79ee5eff75a
Author: Jiayu Liu <Ji...@users.noreply.github.com>
AuthorDate: Fri Nov 25 03:20:35 2022 +0800
add an integration with pytest against pyspark (#3176)
---
.github/workflows/parquet.yml | 34 ++++++++-
.gitignore | 3 +
parquet/README.md | 1 -
parquet/pytest/pyspark_integration_test.py | 65 +++++++++++++++++
parquet/pytest/requirements.in | 20 ++++++
parquet/pytest/requirements.txt | 102 +++++++++++++++++++++++++++
parquet/src/bin/parquet-show-bloom-filter.rs | 19 ++++-
7 files changed, 238 insertions(+), 6 deletions(-)
diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index 5b0cc8744..c5c7aac05 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -19,7 +19,6 @@
# tests for parquet crate
name: "parquet"
-
# trigger for all PRs that touch certain files and changes to master
on:
push:
@@ -58,7 +57,6 @@ jobs:
- name: Test --all-features
run: cargo test -p parquet --all-features
-
# test compilation
linux-features:
name: Check Compilation
@@ -120,6 +118,38 @@ jobs:
- name: Build wasm32-wasi
run: cargo build -p parquet --no-default-features --features cli,snap,flate2,brotli --target wasm32-wasi
+ pyspark-integration-test:
+ name: PySpark Integration Test
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ rust: [stable]
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup Python
+ uses: actions/setup-python@v4
+ with:
+ python-version: "3.10"
+ cache: "pip"
+ - name: Install Python dependencies
+ run: |
+ cd parquet/pytest
+ pip install -r requirements.txt
+ - name: Black check the test files
+ run: |
+ cd parquet/pytest
+ black --check *.py --verbose
+ - name: Setup Rust toolchain
+ run: |
+ rustup toolchain install ${{ matrix.rust }}
+ rustup default ${{ matrix.rust }}
+ - name: Install binary for checking
+ run: cargo install --path parquet --bin parquet-show-bloom-filter --features=arrow,cli
+ - name: Run pytest
+ run: |
+ cd parquet/pytest
+ pytest -v
+
clippy:
name: Clippy
runs-on: ubuntu-latest
diff --git a/.gitignore b/.gitignore
index b8506ea06..52ad19cb0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,3 +92,6 @@ $RECYCLE.BIN/
# Windows shortcuts
*.lnk
+# Python virtual env in parquet crate
+parquet/pytest/venv/
+__pycache__/
diff --git a/parquet/README.md b/parquet/README.md
index c9245b082..d904fc64e 100644
--- a/parquet/README.md
+++ b/parquet/README.md
@@ -41,7 +41,6 @@ However, for historical reasons, this crate uses versions with major numbers gre
The `parquet` crate provides the following features which may be enabled in your `Cargo.toml`:
- `arrow` (default) - support for reading / writing [`arrow`](https://crates.io/crates/arrow) arrays to / from parquet
-- `bloom` (default) - support for [split block bloom filter](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) for reading from / writing to parquet
- `async` - support `async` APIs for reading parquet
- `json` - support for reading / writing `json` data to / from parquet
- `brotli` (default) - support for parquet using `brotli` compression
diff --git a/parquet/pytest/pyspark_integration_test.py b/parquet/pytest/pyspark_integration_test.py
new file mode 100755
index 000000000..0a0b881e3
--- /dev/null
+++ b/parquet/pytest/pyspark_integration_test.py
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pyspark.sql
+import tempfile
+import subprocess
+import pathlib
+
+
+def create_data_and_df():
+ spark = pyspark.sql.SparkSession.builder.getOrCreate()
+ spark.conf.set("parquet.bloom.filter.enabled", True)
+ spark.conf.set("parquet.bloom.filter.expected.ndv", 10)
+ spark.conf.set("parquet.bloom.filter.max.bytes", 32)
+ data = [(f"id-{i % 10}", f"name-{i%10}") for i in range(100)]
+ df = spark.createDataFrame(data, ["id", "name"]).repartition(1)
+ return data, df
+
+
+def get_expected_output(data):
+ expected = ["Row group #0", "=" * 80]
+ for v in data:
+ expected.append(f"Value {v[0]} is present in bloom filter")
+ for v in data:
+ expected.append(f"Value {v[1]} is absent in bloom filter")
+ expected = "\n".join(expected) + "\n"
+ return expected.encode("utf-8")
+
+
+def get_cli_output(output_dir, data, col_name="id"):
+ # take the first (and only) parquet file
+ parquet_file = sorted(pathlib.Path(output_dir).glob("*.parquet"))[0]
+ args = [
+ "parquet-show-bloom-filter",
+ "--file-name",
+ parquet_file,
+ "--column",
+ col_name,
+ ]
+ for v in data:
+ args.extend(["--values", v[0]])
+ for v in data:
+ args.extend(["--values", v[1]])
+ return subprocess.check_output(args)
+
+
+def test_pyspark_bloom_filter():
+ data, df = create_data_and_df()
+ with tempfile.TemporaryDirectory() as output_dir:
+ df.write.parquet(output_dir, mode="overwrite")
+ cli_output = get_cli_output(output_dir, data)
+ assert cli_output == get_expected_output(data)
diff --git a/parquet/pytest/requirements.in b/parquet/pytest/requirements.in
new file mode 100644
index 000000000..a0b30b867
--- /dev/null
+++ b/parquet/pytest/requirements.in
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+pytest
+pyspark
+black
+
diff --git a/parquet/pytest/requirements.txt b/parquet/pytest/requirements.txt
new file mode 100644
index 000000000..fb6f8fb6d
--- /dev/null
+++ b/parquet/pytest/requirements.txt
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# This file is autogenerated by pip-compile with python 3.10
+# To update, run:
+#
+# pip-compile --generate-hashes --resolver=backtracking
+#
+attrs==22.1.0 \
+ --hash=sha256:29adc2665447e5191d0e7c568fde78b21f9672d344281d0c6e1ab085429b22b6 \
+ --hash=sha256:86efa402f67bf2df34f51a335487cf46b1ec130d02b8d39fd248abfd30da551c
+ # via pytest
+black==22.10.0 \
+ --hash=sha256:14ff67aec0a47c424bc99b71005202045dc09270da44a27848d534600ac64fc7 \
+ --hash=sha256:197df8509263b0b8614e1df1756b1dd41be6738eed2ba9e9769f3880c2b9d7b6 \
+ --hash=sha256:1e464456d24e23d11fced2bc8c47ef66d471f845c7b7a42f3bd77bf3d1789650 \
+ --hash=sha256:2039230db3c6c639bd84efe3292ec7b06e9214a2992cd9beb293d639c6402edb \
+ --hash=sha256:21199526696b8f09c3997e2b4db8d0b108d801a348414264d2eb8eb2532e540d \
+ --hash=sha256:2644b5d63633702bc2c5f3754b1b475378fbbfb481f62319388235d0cd104c2d \
+ --hash=sha256:432247333090c8c5366e69627ccb363bc58514ae3e63f7fc75c54b1ea80fa7de \
+ --hash=sha256:444ebfb4e441254e87bad00c661fe32df9969b2bf224373a448d8aca2132b395 \
+ --hash=sha256:5b9b29da4f564ba8787c119f37d174f2b69cdfdf9015b7d8c5c16121ddc054ae \
+ --hash=sha256:5cc42ca67989e9c3cf859e84c2bf014f6633db63d1cbdf8fdb666dcd9e77e3fa \
+ --hash=sha256:5d8f74030e67087b219b032aa33a919fae8806d49c867846bfacde57f43972ef \
+ --hash=sha256:72ef3925f30e12a184889aac03d77d031056860ccae8a1e519f6cbb742736383 \
+ --hash=sha256:819dc789f4498ecc91438a7de64427c73b45035e2e3680c92e18795a839ebb66 \
+ --hash=sha256:915ace4ff03fdfff953962fa672d44be269deb2eaf88499a0f8805221bc68c87 \
+ --hash=sha256:9311e99228ae10023300ecac05be5a296f60d2fd10fff31cf5c1fa4ca4b1988d \
+ --hash=sha256:974308c58d057a651d182208a484ce80a26dac0caef2895836a92dd6ebd725e0 \
+ --hash=sha256:b8b49776299fece66bffaafe357d929ca9451450f5466e997a7285ab0fe28e3b \
+ --hash=sha256:c957b2b4ea88587b46cf49d1dc17681c1e672864fd7af32fc1e9664d572b3458 \
+ --hash=sha256:e41a86c6c650bcecc6633ee3180d80a025db041a8e2398dcc059b3afa8382cd4 \
+ --hash=sha256:f513588da599943e0cde4e32cc9879e825d58720d6557062d1098c5ad80080e1 \
+ --hash=sha256:fba8a281e570adafb79f7755ac8721b6cf1bbf691186a287e990c7929c7692ff
+ # via -r requirements.in
+click==8.1.3 \
+ --hash=sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e \
+ --hash=sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48
+ # via black
+exceptiongroup==1.0.4 \
+ --hash=sha256:542adf9dea4055530d6e1279602fa5cb11dab2395fa650b8674eaec35fc4a828 \
+ --hash=sha256:bd14967b79cd9bdb54d97323216f8fdf533e278df937aa2a90089e7d6e06e5ec
+ # via pytest
+iniconfig==1.1.1 \
+ --hash=sha256:011e24c64b7f47f6ebd835bb12a743f2fbe9a26d4cecaa7f53bc4f35ee9da8b3 \
+ --hash=sha256:bc3af051d7d14b2ee5ef9969666def0cd1a000e121eaea580d4a313df4b37f32
+ # via pytest
+mypy-extensions==0.4.3 \
+ --hash=sha256:090fedd75945a69ae91ce1303b5824f428daf5a028d2f6ab8a299250a846f15d \
+ --hash=sha256:2d82818f5bb3e369420cb3c4060a7970edba416647068eb4c5343488a6c604a8
+ # via black
+packaging==21.3 \
+ --hash=sha256:dd47c42927d89ab911e606518907cc2d3a1f38bbd026385970643f9c5b8ecfeb \
+ --hash=sha256:ef103e05f519cdc783ae24ea4e2e0f508a9c99b2d4969652eed6a2e1ea5bd522
+ # via pytest
+pathspec==0.10.2 \
+ --hash=sha256:88c2606f2c1e818b978540f73ecc908e13999c6c3a383daf3705652ae79807a5 \
+ --hash=sha256:8f6bf73e5758fd365ef5d58ce09ac7c27d2833a8d7da51712eac6e27e35141b0
+ # via black
+platformdirs==2.5.4 \
+ --hash=sha256:1006647646d80f16130f052404c6b901e80ee4ed6bef6792e1f238a8969106f7 \
+ --hash=sha256:af0276409f9a02373d540bf8480021a048711d572745aef4b7842dad245eba10
+ # via black
+pluggy==1.0.0 \
+ --hash=sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159 \
+ --hash=sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3
+ # via pytest
+py4j==0.10.9.5 \
+ --hash=sha256:276a4a3c5a2154df1860ef3303a927460e02e97b047dc0a47c1c3fb8cce34db6 \
+ --hash=sha256:52d171a6a2b031d8a5d1de6efe451cf4f5baff1a2819aabc3741c8406539ba04
+ # via pyspark
+pyparsing==3.0.9 \
+ --hash=sha256:2b020ecf7d21b687f219b71ecad3631f644a47f01403fa1d1036b0c6416d70fb \
+ --hash=sha256:5026bae9a10eeaefb61dab2f09052b9f4307d44aee4eda64b309723d8d206bbc
+ # via packaging
+pyspark==3.3.1 \
+ --hash=sha256:e99fa7de92be406884bfd831c32b9306a3a99de44cfc39a2eefb6ed07445d5fa
+ # via -r requirements.in
+pytest==7.2.0 \
+ --hash=sha256:892f933d339f068883b6fd5a459f03d85bfcb355e4981e146d2c7616c21fef71 \
+ --hash=sha256:c4014eb40e10f11f355ad4e3c2fb2c6c6d1919c73f3b5a433de4708202cade59
+ # via -r requirements.in
+tomli==2.0.1 \
+ --hash=sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc \
+ --hash=sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f
+ # via
+ # black
+ # pytest
diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs
index 55ecb2abf..f9462327f 100644
--- a/parquet/src/bin/parquet-show-bloom-filter.rs
+++ b/parquet/src/bin/parquet-show-bloom-filter.rs
@@ -34,7 +34,11 @@
//! ```
use clap::Parser;
-use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::file::{
+ properties::ReaderProperties,
+ reader::{FileReader, SerializedFileReader},
+ serialized_reader::ReadOptionsBuilder,
+};
use std::{fs::File, path::Path};
#[derive(Debug, Parser)]
@@ -63,8 +67,17 @@ fn main() {
let path = Path::new(&file_name);
let file = File::open(path).expect("Unable to open file");
- let file_reader =
- SerializedFileReader::new(file).expect("Unable to open file as Parquet");
+ let file_reader = SerializedFileReader::new_with_options(
+ file,
+ ReadOptionsBuilder::new()
+ .with_reader_properties(
+ ReaderProperties::builder()
+ .set_read_bloom_filter(true)
+ .build(),
+ )
+ .build(),
+ )
+ .expect("Unable to open file as Parquet");
let metadata = file_reader.metadata();
for (ri, row_group) in metadata.row_groups().iter().enumerate() {
println!("Row group #{}", ri);