You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/01 10:54:00 UTC
[jira] [Updated] (HUDI-3748) Hudi fails to insert into a partitioned table when the partition column is dropped from the parquet schema
[ https://issues.apache.org/jira/browse/HUDI-3748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated HUDI-3748:
---------------------------------
Labels: pull-request-available (was: )
> Hudi fails to insert into a partitioned table when the partition column is dropped from the parquet schema
> ----------------------------------------------------------------------------------------------------------
>
> Key: HUDI-3748
> URL: https://issues.apache.org/jira/browse/HUDI-3748
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Vinoth Govindarajan
> Assignee: Yann Byron
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 0.11.0
>
>
> When you add this config to drop the partition column from the parquet schema to support BigQuery, hudi fails to insert with the following error.
>
> Steps to reproduce:
> Start hudi docker:
> {code:java}
> cd hudi/docker
> ./setup_demo.sh{code}
> {code:java}
> docker exec -it adhoc-2 /bin/bash{code}
> {code:java}
> # Log into spark-sql and execute the following commands:
> spark-sql --jars $HUDI_SPARK_BUNDLE \
> --master local[2] \
> --driver-class-path $HADOOP_CONF_DIR \
> --conf spark.sql.hive.convertMetastoreParquet=false \
> --deploy-mode client \
> --driver-memory 1G \
> --executor-memory 3G \
> --num-executors 1 \
> --packages org.apache.spark:spark-avro_2.11:2.4.4 \
> --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
> --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' {code}
>
> {code:java}
> create table bq_demo_partitioned_cow (
> id bigint,
> name string,
> price double,
> ts bigint,
> dt string
> ) using hudi
> partitioned by (dt)
> tblproperties (
> type = 'cow',
> primaryKey = 'id',
> preCombineField = 'ts',
> hoodie.datasource.write.drop.partition.columns = 'true'
> );
> insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(1, 'a1', 10, current_timestamp());
> insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, 'a2', 20, current_timestamp()); {code}
> Error:
> {code:java}
> 22/03/29 20:58:02 INFO spark.SparkContext: Starting job: collect at HoodieSparkEngineContext.java:100
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Got job 63 (collect at HoodieSparkEngineContext.java:100) with 1 output partitions
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Final stage: ResultStage 131 (collect at HoodieSparkEngineContext.java:100)
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Missing parents: List()
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting ResultStage 131 (MapPartitionsRDD[235] at map at HoodieSparkEngineContext.java:100), which has no missing parents
> 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89 stored as values in memory (estimated size 71.9 KB, free 364.0 MB)
> 22/03/29 20:58:02 INFO memory.MemoryStore: Block broadcast_89_piece0 stored as bytes in memory (estimated size 26.3 KB, free 364.0 MB)
> 22/03/29 20:58:02 INFO storage.BlockManagerInfo: Added broadcast_89_piece0 in memory on adhoc-1:38703 (size: 26.3 KB, free: 365.7 MB)
> 22/03/29 20:58:02 INFO spark.SparkContext: Created broadcast 89 from broadcast at DAGScheduler.scala:1161
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 131 (MapPartitionsRDD[235] at map at HoodieSparkEngineContext.java:100) (first 15 tasks are for partitions Vector(0))
> 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Adding task set 131.0 with 1 tasks
> 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 131.0 (TID 4081, localhost, executor driver, partition 0, PROCESS_LOCAL, 7803 bytes)
> 22/03/29 20:58:02 INFO executor.Executor: Running task 0.0 in stage 131.0 (TID 4081)
> 22/03/29 20:58:02 INFO executor.Executor: Finished task 0.0 in stage 131.0 (TID 4081). 1167 bytes result sent to driver
> 22/03/29 20:58:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 131.0 (TID 4081) in 17 ms on localhost (executor driver) (1/1)
> 22/03/29 20:58:02 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 131.0, whose tasks have all completed, from pool
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: ResultStage 131 (collect at HoodieSparkEngineContext.java:100) finished in 0.030 s
> 22/03/29 20:58:02 INFO scheduler.DAGScheduler: Job 63 finished: collect at HoodieSparkEngineContext.java:100, took 0.032364 s
> 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220329205734338__commit__COMPLETED]}
> 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: Took 0 ms to read 0 instants, 0 replaced file groups
> 22/03/29 20:58:02 INFO util.ClusteringUtils: Found 0 files in pending clustering operations
> 22/03/29 20:58:02 INFO view.AbstractTableFileSystemView: addFilesToView: NumFiles=1, NumFileGroups=1, FileGroupsCreationTime=0, StoreTimeTaken=0
> 22/03/29 20:58:02 INFO hudi.HoodieFileIndex: Refresh table bq_demo_partitioned_cow, spend: 124 ms
> 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Loading HoodieTableMetaClient from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow
> 22/03/29 20:58:02 INFO table.HoodieTableConfig: Loading table properties from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow/.hoodie/hoodie.properties
> 22/03/29 20:58:02 INFO table.HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from hdfs://namenode:8020/user/hive/warehouse/bq_demo_partitioned_cow
> 22/03/29 20:58:02 INFO timeline.HoodieActiveTimeline: Loaded instants upto : Option{val=[20220329205734338__commit__COMPLETED]}
> 22/03/29 20:58:02 INFO command.InsertIntoHoodieTableCommand: insert statement use write operation type: upsert, payloadClass: org.apache.hudi.common.model.OverwriteWithLatestAvroPayload
> 22/03/29 20:58:02 ERROR thriftserver.SparkSQLDriver: Failed in [insert into bq_demo_partitioned_cow partition(dt='2021-12-25') values(2, 'a2', 20, current_timestamp())]
> java.lang.AssertionError: assertion failed: Required partition columns is: {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25
> at scala.Predef$.assert(Predef.scala:170)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
> at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> java.lang.AssertionError: assertion failed: Required partition columns is: {"type":"struct","fields":[]}, Current static partitions is: dt -> 2021-12-25
> at scala.Predef$.assert(Predef.scala:170)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.alignOutputFields(InsertIntoHoodieTableCommand.scala:130)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand$.run(InsertIntoHoodieTableCommand.scala:94)
> at org.apache.spark.sql.hudi.command.InsertIntoHoodieTableCommand.run(InsertIntoHoodieTableCommand.scala:55)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:194)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
> at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:62)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:371)
> at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:274)
> at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)spark-sql> 22/03/29 21:00:38 INFO hfile.LruBlockCache: totalSize=383.75 KB, freeSize=363.83 MB, max=364.20 MB, blockCount=0, accesses=4, hits=0, hitRatio=0, cachingAccesses=0, cachingHits=0, cachingHitsRatio=0,evictions=59, evicted=0, evictedPerRun=0.0 {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)