You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/03 14:51:13 UTC

[GitHub] [hudi] vortual opened a new issue #3919: [SUPPORT]Error obtaining data file/log file grouping when log don't have base file

vortual opened a new issue #3919:
URL: https://github.com/apache/hudi/issues/3919


   **Describe the problem you faced**
   hudi version: 0.9.0
   hudi版本:0.9.0
   
   table type:  merge on read
   表类型:merge on read
   
   when the log file haven't compact,the base file haven't generate yet. spark query the table have an error
   当log文件还没被压缩生成对应的basefile时,spark查询会报错
   
   Error log:
   `org.apache.hudi.exception.HoodieException: Error obtaining data file/log file grouping
     at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:156)
     at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getSplits(HoodieParquetRealtimeInputFormat.java:69)
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
     at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.immutable.List.foreach(List.scala:381)
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
     at scala.collection.immutable.List.map(List.scala:285)
     at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314)
     at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
     at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
     at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
     at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
     ... 48 elided
   Caused by: java.lang.NullPointerException
     at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$null$8(HoodieRealtimeInputFormatUtils.java:132)
     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
     at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
     at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
     at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
     at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
     at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
     at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$getRealtimeSplits$9(HoodieRealtimeInputFormatUtils.java:129)
     at java.util.HashMap$KeySet.forEach(HashMap.java:933)
     at org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:102)
     ... 100 more
   `
   
   **To Reproduce**
   1. create kafka source
   `CREATE TABLE kafka_source(
    user_id STRING,
    order_amount BIGINT,
    log_ts TIMESTAMP(3),
    part STRING
    )WITH(
    'connector' = 'kafka',
    'topic' = 'flink_on_hudi_zrm',
    'properties.bootstrap.servers' = 'node2:6667',
    'scan.startup.mode'='earliest-offset',
    'properties.group.id' = 'testGroup',
    'format' = 'json'
   );`
   2.sync to hudi from kafka
   `CREATE TABLE kafka_source_hudi(
     user_id VARCHAR(20),
     order_amount BIGINT,
     log_ts TIMESTAMP(3),
     `part` VARCHAR(20)
   )
   PARTITIONED BY (`part`)
   WITH (
     'connector' = 'hudi',
     'path' = 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi',
     'table.type' = 'MERGE_ON_READ',
     'write.bucket_assign.tasks' = '2',
     'write.precombine.field' = 'log_ts',
     'write.tasks' = '2',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'hms',
     'hive_sync.metastore.uris' = 'thrift://node7:9083',
     'hoodie.datasource.write.recordkey.field' = 'user_id',
     'compaction.tasks' = '4',
     'compaction.delta_commits' = '3'
   );`
   `insert into kafka_source_hudi select  * from  kafka_source;`
   3. create external table
   `CREATE EXTERNAL TABLE test.kafka_source_hudi_spark(     
     `_hoodie_commit_time` string,                    
      `_hoodie_commit_seqno` string,                   
      `_hoodie_record_key` string,                     
      `_hoodie_partition_path` string,                 
      `_hoodie_file_name` string,                
      user_id string,
     order_amount BIGINT,
     log_ts bigint)                                     
    PARTITIONED BY (                                   
      `part` string)                              
    ROW FORMAT SERDE                                   
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
    STORED AS INPUTFORMAT                              
      'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
    OUTPUTFORMAT                                       
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
    LOCATION                                           
      'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi';
   
   alter table test.kafka_source_hudi_spark add if not exists partition(`part`='par1') location 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par1';
   alter table test.kafka_source_hudi_spark add if not exists partition(`part`='par2') location 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par2';`
   4.query the external table
   `select * from test.kafka_source_hudi_spark limit 10`
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.9.0
   
   * Spark version :2.2.0
   
   * Hive version :1.2.1000
   
   * Hadoop version :2.7.3
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :NO
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual commented on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when log don't have base file

Posted by GitBox <gi...@apache.org>.
vortual commented on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-959399779


   我找到了出问题代码的地方,在HoodieRealtimeInputFormatUtils类里面:
   List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
   这一行groupedInputSplits这里存的是每个file group对应的parquet文件,没有包括log文件
   fileSlice是新增的log,因为groupedInputSplits里面没有对应的filegroup,只有parquet没有log,所以得到的dataFileSplits 为空。之后再调用dataFileSplits.forEach(split -> {})时就会出现空指针异常。
   
   I have found the wrong code, inside the HoodieRealtimeInputFormatUtils class:
   List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
   The groupedInputSplits Map contains the Parquet files corresponding to each file group, and the log files are not included
   FileSlice is the new log, and the dataFileSplits obtained are empty because there is no corresponding Filegroup in the groupedInputSplits Map and only Parquet has no log file.A subsequent call to datafilesplits. forEach(split -> {}) will result in a null pointer exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual commented on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when log don't have base file

Posted by GitBox <gi...@apache.org>.
vortual commented on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-959399779






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual edited a comment on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table

Posted by GitBox <gi...@apache.org>.
vortual edited a comment on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-980631682


   I think #3203 has been resolved this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual commented on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when log don't have base file

Posted by GitBox <gi...@apache.org>.
vortual commented on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-959399779


   我找到了出问题代码的地方,在HoodieRealtimeInputFormatUtils类里面:
   List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
   这一行groupedInputSplits这里存的是每个file group对应的parquet文件,没有包括log文件
   fileSlice是新增的log,因为groupedInputSplits里面没有对应的filegroup,只有parquet没有log,所以得到的dataFileSplits 为空。之后再调用dataFileSplits.forEach(split -> {})时就会出现空指针异常。
   
   I have found the wrong code, inside the HoodieRealtimeInputFormatUtils class:
   List<FileSplit> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
   The groupedInputSplits Map contains the Parquet files corresponding to each file group, and the log files are not included
   FileSlice is the new log, and the dataFileSplits obtained are empty because there is no corresponding Filegroup in the groupedInputSplits Map and only Parquet has no log file.A subsequent call to datafilesplits. forEach(split -> {}) will result in a null pointer exception.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual commented on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table

Posted by GitBox <gi...@apache.org>.
vortual commented on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-980631682


   I think #3203 solved my question


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual closed issue #3919: [SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table

Posted by GitBox <gi...@apache.org>.
vortual closed issue #3919:
URL: https://github.com/apache/hudi/issues/3919


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vortual commented on issue #3919: [SUPPORT]Error obtaining data file/log file grouping when spark query MOR hudi table

Posted by GitBox <gi...@apache.org>.
vortual commented on issue #3919:
URL: https://github.com/apache/hudi/issues/3919#issuecomment-965035444


   I noticed that this problem didn't occur when I set write.bucket_assign.tasks =1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org