You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2023/01/18 07:57:21 UTC
[GitHub] [hudi] ranjani1993 opened a new issue, #7693: [SUPPORT] HUDI file cleanup - Not working as expected
ranjani1993 opened a new issue, #7693:
URL: https://github.com/apache/hudi/issues/7693
**Describe the problem you faced**
HUDI file cleanup is not working as expected when we run it along with data ingestion.
**config used:**
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
If we keep ("hoodie.cleaner.fileversions.retained","1") --> we see 2 or 3 versions of file is maintained
If we keep ("hoodie.cleaner.fileversions.retained","0") --> we get an "File not found exception"
**Note**: Cleanup is working file if we run it as a independent process
Steps to reproduce the behavior:
scenario 1) ("hoodie.cleaner.fileversions.retained","0")
Step 1) Create HUDI external table
beeline
add jar gs://xxx/hudi-hadoop-mr-bundle-0.11.1.jar;
set hive.input.format=org.apache.hudi.hadoop.HoodieParquetInputFormat;
CREATE EXTERNAL TABLE `stg.td_item_hudi`(
`item_nbr` int,
`dept_nbr` int,
`fineline_nbr` int,
`upc_nbr` int,
`product_nbr` int,
`consumer_item_nbr` int,
`item_status_code` char(1),
`assortment_type_cd` int)
partitioned by(ts_created string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 'gs://xxx/stg.db/td_item_hudi';
Step 2) Upsert data
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7 \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--conf "spark.sql.parquet.writeLegacyFormat=true" \
--conf "spark.sql.parquet.enableVectorizedReader=false"
import org.apache.spark.sql.DataFrame
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
val tableName = "stg.td_item_hudi"
val basePath = "gs://xxxx/stg.db/td_item_hudi"
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(2, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01"),
(3, 1000, 100, 111111, 1, 1, "A", 2,"2022-01-01"),
(4, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(5, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","0").
mode(Append).
save(basePath)
23/01/18 07:34:56 ERROR org.apache.spark.scheduler.TaskSetManager: Task 0 in stage 63.0 failed 10 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 63.0 failed 10 times, most recent failure: Lost task 0.9 in stage 63.0 (TID 2504, fd-persistent-sw-jbnh.c.wmt-bfdms-mdsefdprd.internal, executor 2): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:875)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:875)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:359)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1182)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:414)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: java.io.FileNotFoundException: File not found: gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/b9d8357c-b096-4910-ac44-461314d4b6b6-0_0-23-1262_20230118073254752.parquet
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
... 28 more
Caused by: java.io.FileNotFoundException: File not found: gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/b9d8357c-b096-4910-ac44-461314d4b6b6-0_0-23-1262_20230118073254752.parquet
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getFileStatus(GoogleHadoopFileSystemBase.java:1082)
at org.apache.parquet.hadoop.ParquetReader$Builder.build(ParquetReader.java:300)
at org.apache.hudi.io.storage.HoodieParquetReader.getRecordIterator(HoodieParquetReader.java:70)
at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:134)
... 31 more
Scenario 2)
If we give - option("hoodie.cleaner.fileversions.retained","1"). --> we see 2 or 3 file versions are maintained.
Steps to reproduce:
spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_2.12:0.11.1,org.apache.spark:spark-avro_2.12:2.4.7 \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--conf "spark.sql.parquet.writeLegacyFormat=true" \
--conf "spark.sql.parquet.enableVectorizedReader=false"
import org.apache.spark.sql.DataFrame
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.DataSourceWriteOptions
val tableName = "stgtd_item_hudi"
val basePath = "gs://xxx/stg.db/td_item_hudi"
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(2, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01"),
(3, 1000, 100, 111111, 1, 1, "A", 2,"2022-01-01"),
(4, 1000, 100, 111111, 1, 1, "A", 0,"2022-01-01"),
(5, 1000, 100, 111111, 1, 1, "A", 1,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 2 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41 gs://xxx/stgdb/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436612 2023-01-18 06:42 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-27-12
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436612 2023-01-18 06:42 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-27-1219_20230118064106358.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49 gs://xxx/stgs.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436560 2023-01-18 06:52 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-99-3661_20230118065243463.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 1111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436571 2023-01-18 06:58 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-145-4890_20230118065755261.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436557 2023-01-18 06:49 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-62-2439_20230118064856978.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436560 2023-01-18 06:52 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-99-3661_20230118065243463.parquet
val df: DataFrame = Seq(
(1, 1000, 100, 111111, 11111, 1, "A", 0,"2022-01-01")
).toDF("item_nbr", "dept_nbr", "fineline_nbr", "upc_nbr", "product_nbr",
"consumer_item_nbr", "item_status_code", "assortment_type_cd", "ts_created")
df.write.format("hudi").
option("hoodie.datasource.write.table.type","COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field","item_nbr").
option("hoodie.datasource.write.precombine.field","ts_created").
option("hoodie.table.name","stg.td_item_hudi").
option("hoodie.datasource.write.operation","upsert").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.write.partitionpath.field","ts_created").
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
option("hoodie.cleaner.policy","KEEP_LATEST_FILE_VERSIONS").
option("hoodie.cleaner.fileversions.retained","1").
mode(Append).
save(basePath)
hdfs dfs -ls gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01
Found 3 items
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 96 2023-01-18 06:41 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/.hoodie_partition_metadata
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436571 2023-01-18 06:58 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-145-4890_20230118065755261.parquet
-rwxrwxrwx 3 svcmdsefddat svcmdsefddat 436570 2023-01-18 07:00 gs://xxx/stg.db/td_item_hudi/ts_created=2022-01-01/cdeab847-7ee2-4fd8-b9d4-dbc5aa433dc3-0_0-185-6115_20230118070030142.parquet
**Environment Description**
* Hudi version : 0.11.1
* Spark version : 2.4.8
* Hive version : 2.3.7
* Hadoop version : 2.10.1
* Storage (HDFS/S3/GCS..) : GCS
* Running on Docker? (yes/no) : no
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ranjani1993 commented on issue #7693: [SUPPORT] HUDI file cleanup - Not working as expected
Posted by GitBox <gi...@apache.org>.
ranjani1993 commented on issue #7693:
URL: https://github.com/apache/hudi/issues/7693#issuecomment-1386759680
Removed this config:
option("hoodie.clean.automatic","true").
option("hoodie.clean.async","true").
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [hudi] ranjani1993 closed issue #7693: [SUPPORT] HUDI file cleanup - Not working as expected
Posted by GitBox <gi...@apache.org>.
ranjani1993 closed issue #7693: [SUPPORT] HUDI file cleanup - Not working as expected
URL: https://github.com/apache/hudi/issues/7693
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org