You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Ethan Guo (Jira)" <ji...@apache.org> on 2022/06/21 08:38:00 UTC

[jira] [Created] (HUDI-4290) Hive connector in Presto returns duplicate records after clustering

Ethan Guo created HUDI-4290:
-------------------------------

             Summary: Hive connector in Presto returns duplicate records after clustering
                 Key: HUDI-4290
                 URL: https://issues.apache.org/jira/browse/HUDI-4290
             Project: Apache Hudi
          Issue Type: Task
            Reporter: Ethan Guo
             Fix For: 0.12.0


When querying the Hudi table using Hive connector in Presto after a cluster action is complete in the table, the query result contains duplicate records.

Environment: Presto 0.274-SNAPSHOT (latest), Hudi 0.11

Steps to reproduce:

Write Hudi table with clustering

 
{code:java}
./bin/spark-shell  \
     --master yarn \
     --deploy-mode client \
     --driver-memory 8g \
     --executor-memory 8g \
     --num-executors 20 \
     --executor-cores 4 \
     --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
     --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
     --conf spark.kryoserializer.buffer=256m \
     --conf spark.kryoserializer.buffer.max=1024m \
     --conf "spark.driver.defaultJavaOptions=-XX:+UseG1GC" \
     --conf "spark.executor.defaultJavaOptions=-XX:+UseG1GC" \
     --conf spark.ui.proxyBase="" \
     --conf 'spark.eventLog.enabled=true' --conf 'spark.eventLog.dir=hdfs:///var/log/spark/apps' \
     --conf "spark.sql.hive.convertMetastoreParquet=false" \
     --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
     --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'  {code}
 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.SaveMode

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_clustered"
val tablePath = <>

val inputDF = spark.read.format("parquet").load(srcPath)

inputDF.write.
  format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_PROP, "true").
  option(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP, "0").
  option(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS, "43").
  option(HoodieClusteringConfig.CLUSTERING_MAX_NUM_GROUPS, "100").
  option(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY, "star_rating,total_votes").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
Query the table using Hive connector in Presto:

 

 
{code:java}
/presto-cli --catalog hudi --server localhost:9090

select count(review_id) from <table_name> where star_rating > 4 and total_votes > 10;{code}
The result is different from a Hudi table without clustering like below:

 

 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.HoodieDataSourceHelpers
import org.apache.spark.sql.SaveMode
import org.apache.hudi.config.HoodieWriteConfig._

val srcPath = "s3a://amazon-reviews-pds/parquet/"
val tableName = "amazon_reviews_no_clustering"
val tablePath = <>
val inputDF = spark.read.format("parquet").load(srcPath)inputDF.write.format("hudi").
  option(HoodieWriteConfig.TABLE_NAME, tableName).
  option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL).
  option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL).
  option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "review_id").
  option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "product_category").
  option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "review_date").
  option("hoodie.metadata.index.column.stats.enable", "true").
  option(BULK_INSERT_SORT_MODE.key(), "NONE").
  mode(SaveMode.Overwrite).
  save(tablePath) {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)