You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/01 10:54:00 UTC

[jira] [Updated] (HUDI-3748) Hudi fails to insert into a partitioned table when the partition column is dropped from the parquet schema

     [ https://issues.apache.org/jira/browse/HUDI-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

ASF GitHub Bot updated HUDI-3748:
---------------------------------
    Labels: pull-request-available  (was: )

> Hudi fails to insert into a partitioned table when the partition column is dropped from the parquet schema
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-3748
>                 URL: https://issues.apache.org/jira/browse/HUDI-3748
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Vinoth Govindarajan
>            Assignee: Yann Byron
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.11.0
>
>
> When you add this config to drop the partition column from the parquet schema to support BigQuery, hudi fails to insert with the following error.
>  
> Steps to reproduce:
> Start hudi docker:
> {code:java}
> cd hudi/docker
> ./setup_demo.sh{code}
> {code:java}
> docker exec -it adhoc-2 /bin/bash{code}
> {code:java}
> # Log into spark-sql and execute the following commands:
> spark-sql  --jars $HUDI_SPARK_BUNDLE \
>   --master local[2] \
>   --driver-class-path $HADOOP_CONF_DIR \
>   --conf spark.sql.hive.convertMetastoreParquet=false \
>   --deploy-mode client \
>   --driver-memory 1G \
>   --executor-memory 3G \
>   --num-executors 1 \
>   --packages org.apache.spark:spark-avro_2.11:2.4.4 \
>   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
>   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' {code}
>  
> {code:java}
> create table bq_demo_partitioned_cow (
>                                            id bigint,
>                                            name string,
>                                            price double,
>                                            ts bigint,
>                                            dt string
> ) using hudi
>     partitioned by (dt)
>     tblproperties (
>                 type = 'cow',
>                 primaryKey = 'id',
>                 preCombineField = 'ts',
>                 hoodie.datasource.write.drop.partition.columns = 'true'
>             );
> insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(1, 'a1', 10, current_timestamp());
> insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, 'a2', 20, current_timestamp()); {code}
> Error:
> {code:java}
> 22/03/29 20:58:02 INFO spark.SparkContext: Starting job: collect at HoodieSparkEngineContext.java:100
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Got job 63 (collect at HoodieSparkEngineContext.java:100) with 1 output partitions
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 131 (collect at HoodieSparkEngineContext.java:100)
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Missing parents: List()
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting ResultStage 131 (MapPartitionsRDD[235] at map at HoodieSparkEngineContext.java:100), which has no missing parents
> 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89 stored as values in memory (estimated size 71.9 KB, free 364.0 MB)
> 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89_piece0 stored as bytes in memory (estimated size 26.3 KB, free 364.0 MB)
> 22/03/29 20:58:02 INFO storage.BlockManagerInfo: Added broadcast_89_piece0 in memory on adhoc-1:38703 (size: 26.3 KB, free: 365.7 MB)
> 22/03/29 20:58:02 INFO spark.SparkContext: Created broadcast 89 from broadcast at DAGScheduler.scala:1161
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 131 (MapPartitionsRDD[235] at map at HoodieSparkEngineContext.java:100) (first 15 tasks are for partitions Vector(0))
> 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Adding task set 131.0 with 1 tasks
> 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 131.0 (TID 4081, localhost, executor driver, partition 0, PROCESS_LOCAL, 7803 bytes)
> 22/03/29 20:58:02 INFO executor.Executor: Running task 0.0 in stage 131.0 (TID 4081)
> 22/03/29 20:58:02 INFO executor.Executor: Finished task 0.0 in stage 131.0 (TID 4081). 1167 bytes result sent to driver
> 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 131.0 (TID 4081) in 17 ms on localhost (executor driver) (1/1)
> 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 131.0, whose tasks have all completed, from pool
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: ResultStage 131 (collect at HoodieSparkEngineContext.java:100) finished in 0.030 s
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Job 63 finished: collect at HoodieSparkEngineContext.java:100, took 0.032364 s
> 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220329205734338__commit__COMPLETED]}
> 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: Took 0 ms to read  0 instants, 0 replaced file groups
> 22/03/29 20:58:02 INFO util.ClusteringUtils: Found 0 files in pending clustering operations
> 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: addFilesToView: NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
> 22/03/29 20:58:02 INFO hudi.HoodieFileIndex: Refresh table bq_demo_partitioned_cow, spend: 124 ms
> 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow
> 22/03/29 20:58:02 INFO table.HoodieTableConfig: Loading table properties from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow/.hoodie/hoodie.properties
> 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow
> 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220329205734338__commit__COMPLETED]}
> 22/03/29 20:58:02 INFO command.InsertIntoHoodieTableCommand: insert statement use write operation type: upsert, payloadClass: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
> 22/03/29 20:58:02 ERROR thriftserver.SparkSQLDriver: Failed in [insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, 'a2', 20, current_timestamp())]
> java.lang.AssertionError: assertion failed: Required partition columns is: {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25
>     at scala.Predef$.assert(Predef.scala:170)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>     at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
>     at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
>     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
>     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>     at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
>     at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>     at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>     at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> java.lang.AssertionError: assertion failed: Required partition columns is: {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25
>     at scala.Predef$.assert(Predef.scala:170)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94)
>     at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
>     at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
>     at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
>     at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>     at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
>     at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
>     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
>     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
>     at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
>     at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
>     at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>     at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>     at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)spark-sql> 22/03/29 21:00:38 INFO hfile.LruBlockCache: totalSize=383.75 KB, freeSize=363.83 MB, max=364.20 MB, blockCount=0, accesses=4, hits=0, hitRatio=0, cachingAccesses=0, cachingHits=0, cachingHitsRatio=0,evictions=59, evicted=0, evictedPerRun=0.0 {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)