You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "Jeroen (Jira)" <ji...@apache.org> on 2021/02/25 13:42:00 UTC
[jira] [Created] (ARROW-11781) Reading small amount of files from a
partitioned dataset is unexpectedly slow
Jeroen created ARROW-11781:
------------------------------
Summary: Reading small amount of files from a partitioned dataset is unexpectedly slow
Key: ARROW-11781
URL: https://issues.apache.org/jira/browse/ARROW-11781
Project: Apache Arrow
Issue Type: Bug
Reporter: Jeroen
I posted this on StackOverflow and was told I should probably create an issue here.
I managed to create a relative minimal example:
{code:java}
df = spark.createDataFrame(
[
(str(a), b, c, random.randint(0, 1000))
for a in range(100)
for b in range(10)
for c in range(10000)
],
['a', 'b', 'c', 'd']
)
print("Writing the spark dataframe to the file system in partitioned folders.")
df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), compression='snappy', mode='overwrite')
def time_it(func, repetition=10):
start = time.time()
for _ in range(repetition):
func()
return (time.time() - start) / repetition
print("Loading the entire dataset")
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))
print("Loading a single file using filters")
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]])))
print("Loading a single file using filters and a specified partitioning")
partitioning = pa.dataset.HivePartitioning(
pa.schema([
pa.field('a', pa.string()),
pa.field('b', pa.string())
])
)
print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))
print("Loading a single file by specifying the path")
print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', engine='pyarrow')))
{code}
Which gives me the following output:
{code:java}
Writing the spark dataframe to the file system in partitioned folders.
21/02/25 14:37:58 WARN TaskSetManager: Stage 0 contains a task of very large size (18240 KiB). The maximum recommended task size is 1000 KiB.
Loading the entire dataset
0.23926825523376466
Loading a single file using filters
0.04788286685943603
Loading a single file using filters and a specified partitioning
0.0323061466217041
Loading a single file by specifying the path
0.0017130613327026368
{code}
Loading the small amount of files is about 20 times faster if you address the paths directly, compared to the pyarrow filters.
The question as I posted it on StackOverflow:
I am having some problems with the speed of loading `.parquet` files. However, I don't know what I am doing wrong.
*Problem*
I am trying to read a single `.parquet` file from from my local filesystem which is the partitioned output from a spark job. Such that there are `.parquet` files in hierarchical directories named `a=x` and `b=y`.
To achieve this, I am using `pandas.read_parquet` (which uses `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The run time of using the `filters` is way longer than I would expect.
{code:java}
# The following runs for about 55 seconds
pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', '=', 'y')]])
# The following runs for about 0.04 seconds
pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
# The following runs for about 70 seconds
pd.read_parquet(<path_to_entire_dataset>){code}
Reading a single parquet file by specifying filters is only slightly faster than loading the entire dataset, where I would expect a run time approximately linear in the amount of files.
*What mistake do I make here?*
I realize that simply putting the filters in the path would work, however this will quickly become complex as what I want to filter on will / can change. Besides, I think `read_table` should be able to load this data efficiently.
PS: The entire dataset contains many millions of rows, the data I want to load is only a few thousand rows.
*Edit 1:*
As suggested by 0x26res I manually defined the partitioning, this lead to a significant speed up, but still not as much as I would have expected. In this situation the run time was about 5 seconds.
{code:java}
partitioning = HivePartitioning(
pa.schema([
pa.field('a', pa.string()),
pa.field('b', pa.int32()),
])
)
pd.read_parquet(
<path_to_entire_dataset>,
engine='pyarrow',
filters=[
[
('a', '=', x),
('b', '=', y),
]
],
partitioning=partitioning
)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)