You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2020/05/16 12:19:00 UTC

[jira] [Commented] (HUDI-739) HoodieIOException: Could not delete in-flight instant

    [ https://issues.apache.org/jira/browse/HUDI-739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109033#comment-17109033 ] 

sivabalan narayanan commented on HUDI-739:
------------------------------------------

[~guoyihua]: would you be interested in taking a l look at this? Try to see if you can give it a shot. If not, we can re-assign to someone else. 

> HoodieIOException: Could not delete in-flight instant
> -----------------------------------------------------
>
>                 Key: HUDI-739
>                 URL: https://issues.apache.org/jira/browse/HUDI-739
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Common Core
>    Affects Versions: 0.5.0
>            Reporter: Catalin Alexandru Zamfir
>            Priority: Blocker
>              Labels: AWS, S3, bug-bash-0.6.0
>
> We are evaluating Hudi to use for our near real-time ingestion needs, compared to other solutions (Delta/Iceberg). We've picked Hudi because pre-installed with Amazon EMR by AWS. However, adopting it is blocking on this issue with concurrent small batch (of 256 files) write jobs (to the same S3 path).
> Using Livy we're triggering Spark jobs writing Hudi tables over S3, on EMR with EMRFS active. Paths are using the "s3://" prefix and EMRFS is active. We're writing Spark SQL datasets promoted up from RDDs. The "hoodie.consistency.check.enabled" is set to true. Spark serializer is Kryo. Hoodie version is 0.5.0-incubating.
> Both on COW and MOR tables some of the submitted jobs are failing with the below exception:
> {code:java}
> org.apache.hudi.exception.HoodieIOException: Could not delete in-flight instant [==>20200326175252__deltacommit__INFLIGHT]
> 	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInstantFile(HoodieActiveTimeline.java:239)
> 	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.deleteInflight(HoodieActiveTimeline.java:222)
> 	at org.apache.hudi.table.HoodieCopyOnWriteTable.deleteInflightInstant(HoodieCopyOnWriteTable.java:380)
> 	at org.apache.hudi.table.HoodieMergeOnReadTable.rollback(HoodieMergeOnReadTable.java:327)
> 	at org.apache.hudi.HoodieWriteClient.doRollbackAndGetStats(HoodieWriteClient.java:834)
> 	at org.apache.hudi.HoodieWriteClient.rollbackInternal(HoodieWriteClient.java:907)
> 	at org.apache.hudi.HoodieWriteClient.rollback(HoodieWriteClient.java:733)
> 	at org.apache.hudi.HoodieWriteClient.rollbackInflightCommits(HoodieWriteClient.java:1121)
> 	at org.apache.hudi.HoodieWriteClient.startCommitWithTime(HoodieWriteClient.java:994)
> 	at org.apache.hudi.HoodieWriteClient.startCommit(HoodieWriteClient.java:987)
> 	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:141)
> 	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
> 	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> {code}
> The jobs are sent in concurrent batches of 256 files, over the same S3 path, in total some 8k files for 6 hours of our data.
> Writing happens with the following code (basePath is an S3 bucket):
> {code:java}
> // Configs (edited)
> String databaseName = "nrt";
> String assumeYmdPartitions = "false";
> String extractorClass = MultiPartKeysValueExtractor.class.getName ();
> String tableType = DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL ();
> String tableOperation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL ();
> String hiveJdbcUri = "jdbc:hive2://ip-x-y-z-q.eu-west-1.compute.internal:10000";
> String basePath = "s3://some_path_to_hudi"; // or "s3a://" does not seem to matter, same exception
> String avroSchemaAsString = avroSchema.toString ();
> String tableName = avroSchema.getName ().toLowerCase ().replace ("avro", "");
> eventsDataset.write ()
>     .format ("org.apache.hudi")
>     .option (HoodieWriteConfig.TABLE_NAME, tableName)
>     .option (DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY (), tableType)
>     .option (DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY (), "id")
>     .option (DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY (), "partition_path")
>     .option (DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY (), "timestamp")
>     .option (DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY (), "true")
>     .option (DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY (), databaseName)
>     .option (DataSourceWriteOptions.HIVE_TABLE_OPT_KEY (), tableName)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY (), "tenant,year,month,day")
>     .option (DataSourceWriteOptions.HIVE_URL_OPT_KEY (), hiveJdbcUri)
>     .option (DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY (), assumeYmdPartitions)
>     .option (DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY (), extractorClass)
>     .option (DataSourceWriteOptions.OPERATION_OPT_KEY (), tableOperation)
> .mode (SaveMode.Append)
> .save (String.format ("%s/%s", basePath, tableName));
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)