You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Steve Lindemann <sr...@gmail.com> on 2015/07/05 00:45:39 UTC

calling HiveContext.table or running a query reads files unnecessarily in S3

Hi,

I'm just getting started with Spark so apologies if this I'm missing
something obvious. In the below, I'm using Spark 1.4.

I've created a partitioned table in S3 (call it 'dataset'), with basic
structure like so:

s3://bucket/dataset/pk=a
s3://bucket/dataset/pk=b
s3://bucket/dataset/pk=c


In each partition, I wrote a parquet file with some data. I also created a
table in the Hive metastore I'm running, using the command

hiveContext.sql("CREATE EXTERNAL TABLE dataset(k string, v bigint)
PARTITIONED BY (pk string) STORED AS PARQUET LOCATION
's3a://bucket/dataset'")


I also added the partitions pk={a, b, c} using ALTER TABLE commands.

In a different session, I create a hiveContext and call

dataset = hiveContext.table('dataset')


When I do this, I see logs indicating that all parquet files were
opened---why is this? E.g.,

15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file
dataset/pk=a/part-r-00003.gz.parquet at pos 497
15/07/04 21:52:54 INFO S3AFileSystem: Reopening
dataset/pk=a/part-r-00002.gz.parquet to seek to new offset 483
15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file
dataset/pk=b/part-r-00002.gz.parquet at pos 483
15/07/04 21:52:54 INFO S3AFileSystem: Reopening
dataset/pk=b/part-r-00004.gz.parquet to seek to new offset 483

...and so on.


This isn't to much trouble when I only have 3 partitions, but my real
dataset will have thousands (think partitioned by date for 20 years). This
then becomes super slow just to get a handle to the table. I would have
thought that the metastore would have sufficient schema data to create the
DataFrame---it's supposed to enable fast schema discovery, right? I feel
like I must be missing something.

I also tried a second approach, after dropping the table and removing data
from my first attempt. Before writing the partitions, I used the new
DataFrameWriter object to add the table to the metastore and add the path
in S3 (using an empty DataFrame with the correct schema):

my_data.filter('FALSE').write.partitionBy('pk').saveAsTable('dataset',
format='parquet', path='s3a://bucket/dataset')


I then used a DataFrameWriter to write each partition:

my_data.filter(my_data.pk ==
'a').write.partitionBy('pk').insertInto('dataset')


and so on. (Obviously in this toy example I could write it all at once, but
not in the more general case.) Now, when I start a different session and
get a handle to the table, no files are touched! Hooray. But then I run
into a different but similar problem; when I run a query, all files are
touched even though the system recognizes they can be pruned:

df = dataset.filter(dataset.pk == 'b').toPandas()


15/07/04 21:52:54 INFO S3AFileSystem: List status for path:
s3a://bucket/dataset/pk=a
15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for
s3a://bucket/dataset/pk=a (dataset/pk=a)
15/07/04 21:52:54 INFO S3AFileSystem: List status for path:
s3a://bucket/dataset/pk=b
15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for
s3a://bucket/dataset/pk=b (dataset/pk=b)
15/07/04 21:52:54 INFO S3AFileSystem: List status for path:
s3a://bucket/dataset/pk=c
15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for
s3a://bucket/dataset/pk=c (dataset/pk=c)
15/07/04 21:52:54 INFO DataSourceStrategy: Selected 1 partitions out of 3,
pruned -200.0% partitions.

...[so far so good, some other stuff happens here]...
15/07/04 21:52:54 INFO S3AFileSystem: Getting path status for
s3a://bucket/dataset/_common_metadata (dataset/_common_metadata)
15/07/04 21:52:54 INFO S3AFileSystem: Reopening
dataset/pk=a/part-r-00004.gz.parquet to seek to new offset 430
15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file
dataset/pk=a/part-r-00004.gz.parquet at pos 430
15/07/04 21:52:54 INFO S3AFileSystem: Reopening
dataset/pk=b/part-r-00001.gz.parquet to seek to new offset 430
15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file
dataset/pk=b/part-r-00001.gz.parquet at pos 430
15/07/04 21:52:54 INFO S3AFileSystem: Reopening
dataset/pk=c/part-r-00001.gz.parquet to seek to new offset 430
15/07/04 21:52:54 INFO S3AFileSystem: Actually opening file
dataset/pk=c/part-r-00001.gz.parquet at pos 430

So even after it recognized that it could prune all but one partition, it
went through and opened files in all the rest as well! This ruins the
performance of a query over a small number of partitions in a large dataset.

Can anyone clarify why this is happening and how I can avoid this? I would
like to be able to run queries on tables in the metastore without touching
more files than are actually required, based on the partition filter.

Thanks,
Steve