You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Rajkumar Gunasekaran (Jira)" <ji...@apache.org> on 2021/08/09 17:19:00 UTC
[jira] [Updated] (HUDI-2287) Partition pruning not working on Hudi
dataset
[ https://issues.apache.org/jira/browse/HUDI-2287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajkumar Gunasekaran updated HUDI-2287:
---------------------------------------
Description:
Hi, we have created a Hudi dataset which has two level partition like this
{code:java}
s3://somes3bucket/partition1=value/partition2=value
{code}
where _partition1_ and _partition2_ is of type string
When running a simple count query using Hudi format in spark-shell, it takes almost 3 minutes to complete
{code:scala}
spark.read.format("hudi").load("s3://somes3bucket").
where("partition1 = 'somevalue' and partition2 = 'somevalue'").
count()
res1: Long = ####
attempt 1: 3.2 minutes
attempt 2: 2.5 minutes
{code}
In the Spark UI ~9000 tasks (which is approximately equivalent to the total no of files in the ENTIRE dataset s3://somes3bucket) are used for computation. Seems like spark is reading the entire dataset instead of *partition pruning.*...and then filtering the dataset based on the where clause
Whereas, if I use the parquet format to read the dataset, the query only takes ~30 seconds (vis-a-vis 3 minutes with Hudi format)
{code:scala}
spark.read.parquet("s3://somes3bucket").
where("partition1 = 'somevalue' and partition2 = 'somevalue'").
count()
res2: Long = ####
~ 30 seconds
{code}
In the spark UI, only 1361 (ie 1361 tasks) files are scanned (vis-a-vis ~9000 files in Hudi) and takes only 15 seconds
Any idea why partition pruning is not working when using Hudi format? Wondering if I am missing any configuration during the creation of the dataset?
PS: I ran this query in emr-6.3.0 which has Hudi version 0.7.0 and here is the configuration I have used for creating the dataset
{code:scala}
df.writeStream
.trigger(Trigger.ProcessingTime(s"${param.triggerTimeInSeconds} seconds"))
.partitionBy("partition1","partition2")
.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, param.hiveNHudiTableName.get)
//--
.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy")
.option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, param.expectedFileSizeInBytes)
.option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES)
//--
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, (param.expectedFileSizeInBytes / 100) * 80)
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, param.runCompactionAfterNDeltaCommits.get)
//--
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key_id")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[CustomKeyGenerator].getName)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition1:SIMPLE,partition2:SIMPLE")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, hudiTablePrecombineKey)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
//.option(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "partition1,partition2")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, param.hiveDb.get)
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, param.hiveNHudiTableName.get)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
.outputMode(OutputMode.Append())
.queryName(s"${param.hiveDb}_${param.hiveNHudiTableName}_query"){code}
was:
Hi, we have created a Hudi dataset which has two level partition like this
{code:java}
s3://somes3bucket/partition1=value/partition2=value
{code}
where _partition1_ and _partition2_ is of type string
When running a simple count query using Hudi format in spark-shell, it takes almost 3 minutes to complete
{code:scala}
spark.read.format("hudi").load("s3://somes3bucket").
where("partition1 = 'somevalue' and partition2 = 'somevalue'").
count()
res1: Long = ####
attempt 1: 3.2 minutes
attempt 2: 2.5 minutes
{code}
Here is also the metrics in Spark UI where ~9000 tasks (which is approximately equivalent to the total no of files in the ENTIRE dataset s3://somes3bucket) are used for computation. Seems like spark is reading the entire dataset instead of *partition pruning.*...and then filtering the dataset based on the where clause
!Screen Shot 2021-08-09 at 12.03.00 PM.png!
Whereas, if I use the parquet format to read the dataset, the query only takes ~30 seconds (vis-a-vis 3 minutes with Hudi format)
{code:scala}
spark.read.parquet("s3://somes3bucket").
where("partition1 = 'somevalue' and partition2 = 'somevalue'").
count()
res2: Long = ####
~ 30 seconds
{code}
Here is the spark UI, where only 1361 files are scanned (vis-a-vis ~9000 files in Hudi) and takes only 15 seconds
!Screen Shot 2021-08-09 at 12.03.00 PM.png!
Any idea why partition pruning is not working when using Hudi format? Wondering if I am missing any configuration during the creation of the dataset?
PS: I ran this query in emr-6.3.0 which has Hudi version 0.7.0 and here is the configuration I have used for creating the dataset
{code:scala}
df.writeStream
.trigger(Trigger.ProcessingTime(s"${param.triggerTimeInSeconds} seconds"))
.partitionBy("partition1","partition2")
.format("org.apache.hudi")
.option(HoodieWriteConfig.TABLE_NAME, param.hiveNHudiTableName.get)
//--
.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy")
.option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, param.expectedFileSizeInBytes)
.option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES)
//--
.option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, (param.expectedFileSizeInBytes / 100) * 80)
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, param.runCompactionAfterNDeltaCommits.get)
//--
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key_id")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[CustomKeyGenerator].getName)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition1:SIMPLE,partition2:SIMPLE")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, hudiTablePrecombineKey)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
//.option(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY, "false")
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "partition1,partition2")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, param.hiveDb.get)
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, param.hiveNHudiTableName.get)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
.outputMode(OutputMode.Append())
.queryName(s"${param.hiveDb}_${param.hiveNHudiTableName}_query"){code}
> Partition pruning not working on Hudi dataset
> ---------------------------------------------
>
> Key: HUDI-2287
> URL: https://issues.apache.org/jira/browse/HUDI-2287
> Project: Apache Hudi
> Issue Type: Bug
> Components: Performance
> Reporter: Rajkumar Gunasekaran
> Priority: Major
> Attachments: Screen Shot 2021-08-09 at 12.03.00 PM.png, Screen Shot 2021-08-09 at 12.04.41 PM.png
>
>
> Hi, we have created a Hudi dataset which has two level partition like this
> {code:java}
> s3://somes3bucket/partition1=value/partition2=value
> {code}
> where _partition1_ and _partition2_ is of type string
> When running a simple count query using Hudi format in spark-shell, it takes almost 3 minutes to complete
>
> {code:scala}
> spark.read.format("hudi").load("s3://somes3bucket").
> where("partition1 = 'somevalue' and partition2 = 'somevalue'").
> count()
>
> res1: Long = ####
> attempt 1: 3.2 minutes
> attempt 2: 2.5 minutes
> {code}
> In the Spark UI ~9000 tasks (which is approximately equivalent to the total no of files in the ENTIRE dataset s3://somes3bucket) are used for computation. Seems like spark is reading the entire dataset instead of *partition pruning.*...and then filtering the dataset based on the where clause
> Whereas, if I use the parquet format to read the dataset, the query only takes ~30 seconds (vis-a-vis 3 minutes with Hudi format)
> {code:scala}
> spark.read.parquet("s3://somes3bucket").
> where("partition1 = 'somevalue' and partition2 = 'somevalue'").
> count()
> res2: Long = ####
> ~ 30 seconds
> {code}
> In the spark UI, only 1361 (ie 1361 tasks) files are scanned (vis-a-vis ~9000 files in Hudi) and takes only 15 seconds
> Any idea why partition pruning is not working when using Hudi format? Wondering if I am missing any configuration during the creation of the dataset?
> PS: I ran this query in emr-6.3.0 which has Hudi version 0.7.0 and here is the configuration I have used for creating the dataset
> {code:scala}
> df.writeStream
> .trigger(Trigger.ProcessingTime(s"${param.triggerTimeInSeconds} seconds"))
> .partitionBy("partition1","partition2")
> .format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, param.hiveNHudiTableName.get)
> //--
> .option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC, "snappy")
> .option(HoodieStorageConfig.PARQUET_FILE_MAX_BYTES, param.expectedFileSizeInBytes)
> .option(HoodieStorageConfig.PARQUET_BLOCK_SIZE_BYTES, HoodieStorageConfig.DEFAULT_PARQUET_BLOCK_SIZE_BYTES)
> //--
> .option(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT_BYTES, (param.expectedFileSizeInBytes / 100) * 80)
> .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "true")
> .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, param.runCompactionAfterNDeltaCommits.get)
> //--
> .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "record_key_id")
> .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, classOf[CustomKeyGenerator].getName)
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition1:SIMPLE,partition2:SIMPLE")
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, hudiTablePrecombineKey)
> .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
> //.option(DataSourceWriteOptions.HIVE_USE_JDBC_OPT_KEY, "false")
> .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
> .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "partition1,partition2")
> .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, param.hiveDb.get)
> .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, param.hiveNHudiTableName.get)
> .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
> .outputMode(OutputMode.Append())
> .queryName(s"${param.hiveDb}_${param.hiveNHudiTableName}_query"){code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)