You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Sagar Sumit (Jira)" <ji...@apache.org> on 2022/06/23 14:52:00 UTC

[jira] [Closed] (HUDI-4290) Hive connector in Presto returns duplicate records after clustering

     [ https://issues.apache.org/jira/browse/HUDI-4290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sagar Sumit closed HUDI-4290.
-----------------------------
    Resolution: Fixed

> Hive connector in Presto returns duplicate records after clustering
> -------------------------------------------------------------------
>
>                 Key: HUDI-4290
>                 URL: https://issues.apache.org/jira/browse/HUDI-4290
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.11.0
>            Reporter: Ethan Guo
>            Assignee: Sagar Sumit
>            Priority: Blocker
>              Labels: Presto, pull-request-available
>             Fix For: 0.12.0
>
>
> When querying the Hudi table using Hive connector in Presto after a cluster action is complete in the table, the query result contains duplicate records.
> Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11
> Steps to reproduce:
> Write Hudi table with clustering
> {code:java}
> ./bin/spark-shell  \
>      --master yarn \
>      --deploy-mode client \
>      --driver-memory 8g \
>      --executor-memory 8g \
>      --num-executors 20 \
>      --executor-cores 4 \
>      --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
>      --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>      --conf spark.kryoserializer.buffer=256m \
>      --conf spark.kryoserializer.buffer.max=1024m \
>      --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
>      --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
>      --conf spark.ui.proxyBase="" \
>      --conf 'spark.eventLog.enabled=true' --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
>      --conf "spark.sql.hive.convertMetastoreParquet=false" \
>      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
>      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'  {code}
>  
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.config.HoodieClusteringConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.hudi.config.HoodieWriteConfig._
> import org.apache.spark.sql.SaveMode
> val srcPath = "s3a://amazon-reviews-pds/parquet/"
> val tableName = "amazon_reviews_clustered"
> val tablePath = <>
> val inputDF = spark.read.format("parquet").load(srcPath)
> inputDF.write.
>   format("hudi").
>   option(HoodieWriteConfig.TABLE_NAME, tableName).
>   option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
>   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
>   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
>   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
>   option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
>   option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
>   option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
>   option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
>   option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, "star_rating,total_votes").
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option(BULK_INSERT_SORT_MODE.key(), "NONE").
>   mode(SaveMode.Overwrite).
>   save(tablePath) {code}
> Query the table using Hive connector in Presto:
> {code:java}
> /presto-cli --catalog hudi --server localhost:9090
> select count(review_id) from <table_name> where star_rating > 4 and total_votes > 10;{code}
> The result is different from a Hudi table without clustering like below:
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.spark.sql.SaveMode
> import org.apache.hudi.config.HoodieWriteConfig._
> val srcPath = "s3a://amazon-reviews-pds/parquet/"
> val tableName = "amazon_reviews_no_clustering"
> val tablePath = <>
> val inputDF = spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
>   option(HoodieWriteConfig.TABLE_NAME, tableName).
>   option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
>   option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
>   option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
>   option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
>   option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
>   option("hoodie.metadata.index.column.stats.enable", "true").
>   option(BULK_INSERT_SORT_MODE.key(), "NONE").
>   mode(SaveMode.Overwrite).
>   save(tablePath) {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)