You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "luyongbiao (via GitHub)" <gi...@apache.org> on 2023/04/10 06:57:09 UTC

[GitHub] [hudi] luyongbiao opened a new issue, #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

luyongbiao opened a new issue, #8416:
URL: https://github.com/apache/hudi/issues/8416

   **Describe the problem you faced**
   
   I have a MOR table, consists of 10 base files and 4 log files. There are 377 fields and 1403708 records in the table.  
   my application read the table and wirte data to another COW table through spark sql "select * from my_mor_table". 
   I found that my COW table only has 1403704 records and 4 records are missing. 
   
   By debugging, I found that data is lost after createRdd method in HoodieSparkUtils.scala, which converts Row to GenericRecord.
   But the strange thing is, I use another spark sql "select fields1,feids2,...fieds100 from my_mor_table", in the select statement the number of fields is less than or equal to 100, my mor_table has 1403708 records, no data loss.  When the number of select statement fields exceeds 100, data will be lost. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. create a COW table(table1)which the number of fields exceeds 100.
   2.  insert 10 mock records with INSERT action. 
   3. change table1's table type to MOR. 
   4. update 2 records of table1 with UPSERT action.
   5. create a COW table(table2)
   6.  Dataset<Row> dataset = spark.sql(select * from table1);
        dataset.count();
   result->10
   7. dataset.write()
                  .format("org.apache.hudi")
                  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "field1")
                  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                  .option(HoodieWriteConfig.TABLE_NAME, table2)
                  .options(xxx)
                  .mode(SaveMode.Append)
                  .save(table2Path);
       Dataset<Row> dataset2 = spark.sql(select * from table2);
       dataset2.count();
   result -> 9 (1 record loss)
   
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.12.1
   
   * Spark version : 3.1.1
   
   * Hive version : 1.2.1
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : yes
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505689883

   @nsivabalan 
   data was also lost after writing to the parquet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503161125

   can you r


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1501703946

   @luyongbiao As this issue is coming for number of fields more than 100, can you try setting spark.sql.codegen.maxFields more than number of fields in your table. 
   
   The default value for spark.sql.codegen.maxFields is 100.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505056917

   > It is possible that source table has duplicates.
   
   My Mor table's source is parquet files in Aws S3 which generated by Aws Dms. it's not possible that source table has duplicates. becasuse I would combine the records if the record key was duplicated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505075449

   @codepe
   Data loss will occur in special scenarios, such as setting "spark.sql.codegen.wholeStage" to "false", the size of the Java bytecode generated by spark Catalyst exceeds 64KB, or the number of My Mor table's fields is more than the setting "spark.sql.codegen .maxFields".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505063101

   @codepe


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao closed issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao closed issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala
URL: https://github.com/apache/hudi/issues/8416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1502232116

   @ad1happy2go thanks, your solution fixed my problem. 
   But I used `dataset.withColumn(newColumnName, functions.expr(expression))` with long expression before writing, The data is lost again.  
   the write action executed successfully.And Stacktrace report a issue after createRdd method in HoodieSparkUtils.scala。 
   ```
   org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1" grows beyond 64 KB
       at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:382) ~[janino-3.0.9.jar:na]
       at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) ~[janino-3.0.9.jar:na]
       at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) ~[janino-3.0.9.jar:na]
       at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313) ~[janino-3.0.9.jar:na]
       at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235) ~[janino-3.0.9.jar:na]
       at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) ~[janino-3.0.9.jar:na]
       at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) ~[commons-compiler-3.0.9.jar:na]
       at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1403) [spark-catalyst_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1500) [spark-catalyst_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497) [spark-catalyst_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.sparkproject.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) [spark-network-common_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1351) [spark-catalyst_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:721) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:101) ~[hudi-spark3.1-bundle_2.12-0.12.2.jar:0.12.2]
       at org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:79) ~[hudi-spark3.1-bundle_2.12-0.12.2.jar:0.12.2]
       at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:292) ~[hudi-spark3.1-bundle_2.12-0.12.2.jar:0.12.2]
       at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) ~[hudi-spark3.1-bundle_2.12-0.12.2.jar:0.12.2]
       at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) [spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
       at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) ~[spark-sql_2.12-3.1.1.jar:3.1.1]
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1502707341

   The reason for above error is when Java programs generated using Catalyst from programs using DataFrame and Dataset are compiled into Java bytecode, the size of byte code of one method must not be 64 KB or more.
   
   You can try setting - spark.sql.codegen.wholeStage= "false"
   
   Also, I have seen similar unusual behaviour with spark handling large number of fields with other file formats too in past. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505435645

   we could do something here: instead of directly writing to COW, can you write to parquet after consuming from source MOR table and check if you see any data loss? 
   that way, we can rule out whether basic reading from MOR is having issues. or the toRDD call while trying to write to hudi is causing issues. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1506741632

   I was able to reproduce the bug with spark.sql.codegen.wholeStage as false with hudi 0.12.2.
   But the issue doesn't exists in future releases of hudi, it works good with 0.13.0. Can you please confirm.
   
   Can you also try the issue with more than 100 fields using hudi 0.13.0.
   
   Pasting scala shell code -
   
   Works with
   ```
   ~/spark/spark-3.2.3-bin-hadoop3.2/bin/spark-shell \
     --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0 \
     --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
     --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
     --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.codegen.wholeStage=false'
   ```
   
   ```
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   val tableAPath = "file:///tmp/hudi_trips_mor_issue_8416_try_user3"
   val tableName = "hudi_trips_mor_issue_8416_try_user3"
   
   var dataset = spark.sql("select 1 id, 'mock' field1, 'mock' field2"
   + "\nunion all select 2 id, 'mock' field1, 'mock' field2"
   + "\nunion all select 3 id, 'mock' field1, 'mock' field2"
   + "\nunion all select 4 id, 'mock' field1, 'mock' field2"
   + "\nunion all select 5 id, 'mock' field1, 'mock' field2");
   
   dataset = dataset.withColumn("lake_update_date", current_timestamp());
   
   (dataset.write.format("org.apache.hudi")
   .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
   .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
   .option(RECORDKEY_FIELD_OPT_KEY, "id")
   .option(PARTITIONPATH_FIELD_OPT_KEY, "")
   .option(PRECOMBINE_FIELD_PROP, "lake_update_date")
   .option(TABLE_NAME, "tableA")
   .mode(Append)
   .save(tableAPath))
   print(dataset.count())
   
   var dataset2 = (spark.read.format("org.apache.hudi")
   .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL)
   .load(tableAPath));
   
   dataset2 = dataset2.filter(
   "id = 1 or id = 2").withColumn(
   "field2", lit("mock2")).withColumn(
   "lake_update_date", current_timestamp);
   
   (dataset2.write.format("org.apache.hudi")
   .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
   .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
   .option(RECORDKEY_FIELD_OPT_KEY, "id")
   .option(PARTITIONPATH_FIELD_OPT_KEY, "")
   .option(PRECOMBINE_FIELD_PROP, "lake_update_date")
   .option(TABLE_NAME, "tableA")
   .mode(Append)
   .save(tableAPath))
   
   spark.read.format("org.apache.hudi").option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tableAPath).count()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1502727733

   > The reason for above error is when Java programs generated using Catalyst from programs using DataFrame and Dataset are compiled into Java bytecode, the size of byte code of one method must not be 64 KB or more.
   > 
   > You can try setting - spark.sql.codegen.wholeStage= "false"
   > 
   > Also, I have seen similar unusual behaviour with spark handling large number of fields with other file formats too in past.
   
   This setting  just worked for  the error displays, data was still lost.
   Do you have Any suggestion for me to fix this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503146775

   I found data must be lost if not run with spark whole


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505059930

   > @luyongbiao How are you consuming from source to create the MOR table? Did you do record key based validation on MOR table that it has all the unique records. It is possible that source table has duplicates.
   
   My Mor table was created by reading parquet files in Aws S3 which generated by Aws DMS. it's not possible that source table has duplicates. Becasuse i would precombine the records if the record key was duplicated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505094872

   The number of data missing is usually equal to the number of filegroups that have at least one log file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503105055

   @ad1happy2go 
   if setting spark.sql.codegen.wholeStage= "false", data must lose  while read MOR table(at least one log file) .
   reproduceble code (Java):
   1、run application, create sparkSession with setting - spark.sql.codegen.wholeStage= "false".
   ```
           SparkConf sparkConf = new SparkConf(false).setMaster("local[*]");
           SparkSession spark = SparkSession.builder()
                   .config(sparkConf)
                   .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                   .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                   .config("spark.sql.codegen.maxFields", "1000")
                   .config("spark.sql.codegen.wholeStage", "false")
                   .getOrCreate();
   ```
   2、 Create tableA , And insert 5 records
   ```
           Dataset<Row> dataset = spark.sql("select 1 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 2 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 3 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 4 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 5 id, 'mock' field1, 'mock' field2");
           String tableAPath = "file:/D:/test";
           dataset = dataset.withColumn("lake_update_date", functions.current_timestamp());
           dataset.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
                   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
           System.out.println(dataset.count());
   ```
   print -> 5 records
   3、update 2 records
   ```
           Dataset<Row> dataset2 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           dataset2 = dataset2.filter("id = 1 or id = 2").withColumn("field2", functions.lit("mock2"));
           dataset2.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
                   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .option("hoodie.archival.rollback.instants.hours.retained", "10")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
   ```
   4、read tableA agian, And show the table count
   ```
   Dataset<Row> dataset3 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           System.out.println(dataset3.count());
   print -> 4 records (1 record missing)
   ```
   5、  rerun application with a new sparkSeesion and read tableA again which don't have setting - spark.sql.codegen.wholeStage= "false"
   ```
           SparkSession spark = SparkSession.builder()
                   .config(sparkConf)
                   .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                   .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                   .config("spark.sql.codegen.maxFields", "1000")
                   .getOrCreate();
             Dataset<Row> dataset = spark.read().format("org.apache.hudi")
                     .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),       
             DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                     .load(tableAPath);
             System.out.println(dataset.count());
   ```
   print -> 5 records (no record missing)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1502894958

   @luyongbiao So are you saying with both of the above settings also, you see the data loss.
   
   If yes, Can you share complete reproducible code with your data sets. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503112028

   data loss too without setting spark.sql.codegen.wholeStage= "false". But at this case, data was lost in genericRecord after createRdd method, instead of dataset count.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1507871268

   https://github.com/apache/hudi/pull/7334 fixed the issue in 0.13.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1508122579

   thank you all very much


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] ad1happy2go commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "ad1happy2go (via GitHub)" <gi...@apache.org>.
ad1happy2go commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1506677225

   @luyongbiao I was able to reproduce the bug with spark.sql.codegen.wholeStage as false with spark 3.1 and 3.2.
   But when I tried with later spark version i.e. 3.3.2 with same hudi version, this issue no longer exists. So looks like some code fix has been done on spark side with the latest version. Can you please confirm by upgrading the spark version. Below is same Scala version of your code.
   
   Pasting scala shell code - 
   
   Add conf while opening spark shell : 
   --conf 'spark.sql.codegen.wholeStage=false'
   
   import org.apache.hudi.QuickstartUtils._
   import scala.collection.JavaConversions._
   import org.apache.spark.sql.SaveMode._
   import org.apache.hudi.DataSourceReadOptions._
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.config.HoodieWriteConfig._
   import org.apache.hudi.common.model.HoodieRecord
   
   
   val tableAPath = "file:///tmp/hudi_trips_mor_issue_8416_try_user3"
   val tableName = "hudi_trips_mor_issue_8416_try_user3"
   
   var dataset = spark.sql("select 1 id, 'mock' field1, 'mock' field2"
                                 + "\nunion all select 2 id, 'mock' field1, 'mock' field2"
                                 + "\nunion all select 3 id, 'mock' field1, 'mock' field2"
                                 + "\nunion all select 4 id, 'mock' field1, 'mock' field2"
                                 + "\nunion all select 5 id, 'mock' field1, 'mock' field2");
   
   dataset = dataset.withColumn("lake_update_date", current_timestamp());
   
   (dataset.write.format("org.apache.hudi")
                   .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
                   .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
                   .option(RECORDKEY_FIELD_OPT_KEY, "id")
                   .option(PARTITIONPATH_FIELD_OPT_KEY, "")
                   .option(PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(TABLE_NAME, "tableA")
                   .mode(Append)
                   .save(tableAPath))
   print(dataset.count())
   
   var dataset2 = (spark.read.format("org.apache.hudi")
                   .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL)
                   .load(tableAPath));
   
   dataset2 = dataset2.filter(
                   "id = 1 or id = 2").withColumn(
                   "field2", lit("mock2")).withColumn(
                   "lake_update_date", current_timestamp);
   
   (dataset2.write.format("org.apache.hudi")
                   .option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL)
                   .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
                   .option(RECORDKEY_FIELD_OPT_KEY, "id")
                   .option(PARTITIONPATH_FIELD_OPT_KEY, "")
                   .option(PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(TABLE_NAME, "tableA")
                   .mode(Append)
                   .save(tableAPath))
   
   spark.read.format("org.apache.hudi").option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tableAPath).count()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1505047037

   @luyongbiao How are you consuming from source to create the MOR table? Did you do record key based validation on MOR table that it has all the unique records. It is possible that source table has duplicates.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1502247270

    I wonder why not setting spark.sql.codegen.maxFields or others leads to data loss. I want the application to end abnormally instead of data loss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503140650

   data loss too without setting -- 
   - spark.sql.codegen.wholeStage= "false"                  
   - spark.sql.codegen.maxFileld= "xxx"
    if the number of  fields more than 100 after createRdd method


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao closed issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao closed issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala
URL: https://github.com/apache/hudi/issues/8416


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] luyongbiao commented on issue #8416: [SUPPORT] data loss after createRdd method in HoodieSparkUtils.scala

Posted by "luyongbiao (via GitHub)" <gi...@apache.org>.
luyongbiao commented on issue #8416:
URL: https://github.com/apache/hudi/issues/8416#issuecomment-1503572355

   You can reproduce this issue by below code,
   
   ```
   package test;
   
   import org.apache.hudi.DataSourceReadOptions;
   import org.apache.hudi.DataSourceWriteOptions;
   import org.apache.hudi.config.HoodieWriteConfig;
   import org.apache.spark.SparkConf;
   import org.apache.spark.sql.*;
   import org.apache.spark.sql.types.DataTypes;
   
   import java.io.IOException;
   import java.nio.file.Files;
   
   public class Test {
       public static void main(String [] args) throws IOException {
           //1、create sparkSession, set "spark.sql.codegen.wholeStage" to "false".
           SparkConf sparkConf = new SparkConf(false).setMaster("local[*]");
           SparkSession spark = SparkSession.builder()
                   .config(sparkConf)
                   .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                   .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
                   .config("spark.sql.codegen.maxFields", "1000")
                   .config("spark.sql.codegen.wholeStage", "false")
                   .config("spark.driver.host", "localhost")
                   .getOrCreate();
   
           //2、 Create tableA , And insert 5 records, then 1 base file generate
           Dataset<Row> dataset = spark.sql("select 1 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 2 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 3 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 4 id, 'mock' field1, 'mock' field2"
                   + "\nunion all select 5 id, 'mock' field1, 'mock' field2");
           String tableAPath = Files.createTempDirectory("").toString();
           dataset = dataset.withColumn("lake_update_date", functions.current_timestamp());
           dataset.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
                   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
           System.out.println(dataset.count());//should be 5
   
           //3、update 2 records in tableA, then 1 log file generate
           Dataset<Row> dataset2 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           dataset2 = dataset2.filter("id = 1 or id = 2").withColumn("field2", functions.lit("mock2").cast(DataTypes.StringType))
                   .withColumn("lake_update_date", functions.current_timestamp());
           dataset2.write()
                   .format("org.apache.hudi")
                   .option(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL())
                   .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL())
                   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
                   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "")
                   .option(HoodieWriteConfig.PRECOMBINE_FIELD_PROP, "lake_update_date")
                   .option(HoodieWriteConfig.TABLE_NAME, "tableA")
                   .mode(SaveMode.Append)
                   .save(tableAPath);
   
           //4、read tableA again, And show the table count
           Dataset<Row> dataset3 = spark.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           System.out.println(dataset3.count());//should be 4, 1 record was lost
   
           //5、 create a new sparkSession without setting "spark.sql.codegen.wholeStage", and read tableA again
           SparkSession newSessionWithoutCodegenDisabled = spark.cloneSession();
           newSessionWithoutCodegenDisabled.conf().unset("spark.sql.codegen.wholeStage");
           Dataset<Row> newDataset = newSessionWithoutCodegenDisabled.read().format("org.apache.hudi")
                   .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(),
                           DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
                   .load(tableAPath);
           System.out.println(newDataset.count());//should be 5, no missing
       }
   }
   ```
   You can check the comments in the code, and you can see when whole stage codegen is disabled, 1 record would be lost after writing into the MOR table and reading from it.
   And once whole stage codegen disable option is unset, the result is expected.
   
   We spent hours debugging this issue and found in [HoodieSparkUtils](https://github.com/apache/hudi/blob/release-0.12.1/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala) line 102, it calls "rows.isEmpty" method, if whole stage codegen is disabled or number of fields in the dataset is more than the value of "spark.sql.codegen.maxFields" option.
   Then the code will call "[HoodieMergeOnReadRDD](https://github.com/apache/hudi/blob/release-0.12.1/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala).RecordMergingFileIterator.hasNext" method, which is equals to "hasNextInternal" method,
   and in "hasNextInternal" method, it calls "baseFileIterator.next()" method, we think this method should be called in "next()" method based on the normal use of iterator.
   
   please help check and clarify if our suspicion is correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org