You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2021/07/10 14:11:18 UTC

转发:flink hudi parquet file reader read unknown parquet header type, and throw NPE

原始邮件 
发件人: vtygoss<vt...@126.com>
收件人: users-faq@hudi.apache.org; user@flink.apache.org<us...@flink.apache.org>
发送时间: 2021年7月9日(周五) 16:57
主题: flink hudi parquet file reader read unknown parquet header type, and throw NPE








Hi, comunity!


I am new developer in hudi-flink, and i met a problem. 
I create a flink table with ‘connector’=‘hudi’, and the hudi format files were written by hudi-spark-0.9.0-SNAPSHOT. 


some scenario
- When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”, the result is displayed normally. 
- When I execute statement “select * from ibaehis_ogg_sync_pr_encounter limit 10” or “limit 100” and so on, NPE happend. NPE info below.
- When I execute statement “select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter” whether or not  limit number, NPE also happend, NPE info below. But when i project any other column, not NPE happened.




```
[create table statment]
create table if not exists ibaehis_ogg_sync_pr_encounter (
`_hoodie_commit_time` string,`_hoodie_commit_seqno` string,`_hoodie_record_key` string,`_hoodie_partition_path` string,`_hoodie_file_name` string,`facility_id` string,`encounter_id` string,`patient_id` string,`patient_class` string,`visit_adm_date_time` string,`visit_adm_type_facility_id` string,`visit_adm_type` string,`visit_adm_type_ind` string,`assign_care_locn_type` string,`assign_care_locn_code` string,`assign_room_type` string,`assign_room_num` string,`assign_bed_num` string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn` string,`appt_id` string,`invitation_no` string,`booking_case_yn` string,`booking_ref_no` string,`service_code` string,`subservice_code` string,`ancillary_yn` string,`specialty_code` string,`patient_type` string,`circumstance_of_injury_code` string,`ambulatory_status` string,`courtesy_code` string,`chief_complaint` string,`patient_condition` string,`new_op_episode_yn` string,`episode_id` string,`op_episode_visit_num` string,`admit_practitioner_id` string,`discharge_date_time` string,`disp_auth_practitioner_id` string,`disposition_type` string,`recall_yn` string,`recall_date` string,`disp_facility_id` string,`disp_referral_id` string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period` string,`shift_id` string,`backdated_yn` string,`visit_status` string,`visit_status_set_on_date` string,`visit_status_set_by_user` string,`visit_status_set_reason` string,`adt_status` string,`adt_status_set_on_date` string,`adt_status_set_by_user` string,`adt_status_set_reason` string,`leave_expiry_date_time` string,`oth_adt_status` string,`ip_leave_status` string,`contact_reason_code` string,`recall_reason` string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn` string,`prn_visit_before` string,`ae_episode_yn` string,`revise_reason_code` string,`cancel_reason_code` string,`team_id` string,`credit_auth_user_id` string,`credit_auth_remarks` string,`admission_no` string,`deceased_date_time` string,`security_level` string,`protection_ind` string,`assign_bed_class_code` string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no` string,`pol_stn_id` string,`pol_id` string,`informed_to` string,`informed_name` string,`informed_date_time` string,`post_mortem_req_yn` string,`pat_curr_locn_type` string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn` string,`marked_by_id` string,`marked_date` string,`closed_by_id` string,`closed_date` string,`patient_priority_no` string,`assign_bed_type_code` string,`dc_unit_code` string,`bed_allocation_date_time` string,`mark_arrival_date_time` string,`priority_zone` string,`treatment_area_code` string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code` string,`prev_assign_care_locn_code` string,`prev_assign_room_num` string,`prev_subservice_code` string,`prev_attend_practitioner_id` string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind` string,`file_no` string,`movement_reason_code` string,`queue_id` string,`other_res_class` string,`other_resource_id` string,`pm_yn` string,`medical_yn` string,`surgical_yn` string,`body_release_date_time` string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn` string,`mech_injury_catg_code` string,`mech_injury_subcatg_code` string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub` string,`card_id` string,`expiry_date` string,`privl_type_code` string,`room_tel_ext_num` string,`date_time_of_accident` string,`place_of_accident` string,`mv_accident_yn` string,`reserved_nurs_unit_code` string,`reserved_room_no` string,`reserved_bed_no` string,`cancel_disch_practitioner_id` string,`cancel_disch_date` string,`added_by_id` string,`added_date` string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id` string,`modified_date` string,`modified_at_ws_no` string,`modified_facility_id` string,`cancel_checkout_reason_code` string,`cancel_checkout_by_id` string,`cancel_checkout_remarks` string,`prev_visit_status` string,`prev_practitioner_id` string,`prev_brought_dead_yn` string,`prev_deceased_date_time` string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code` string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks` string,`late_disc_reason_code` string,`other_late_disc_reason` string,`injury_date` string,`trmt_start_date` string,`trmt_end_date` string,`did_yn` string,`prev_did_yn` string,`weight_on_admission` string,`weight_on_admission_unit` string,`vehicle_involved1_code` string,`vehicle_involved2_code` string,`pat_position_code` string,`protective_device_code` string,`vehicle_reg_no1` string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code` string,`transport_mode` string,`arrival_code` string,`pre_dis_initiated_date_time` string,`accompanied_by_code` string,`pre_dis_initiated_user` string,`recalled_enc_id` string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn` string,`other_reason_remarks` string,`revise_visit_remarks` string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi` string,`discharge_to` string,`ts` bigint,PRIMARY KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter','table.type'='MERGE_ON_READ',
'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');


[files in hdfs]
```




```
[NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]


021-07-09 16:22:54,634 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - split_reader -> SinkConversionToTuple2 (3/4) (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,653 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - split_reader -> SinkConversionToTuple2 (1/4) (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,654 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: SQL Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,655 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source) ~[?:?]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_74]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.lang.NullPointerException
        at org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
2021-07-09 16:22:54,657 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
2021-07-09 16:22:54,658 INFO  org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down


```




and i tried to track this exception, it seems like throwing NPE before throwing RuntimeException.
[org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java]






[org.apache.parquet.hadoop.ColumnChunkPageReader.java]


when "compressedPages" is empty, method readPage() will return null. And “compressedPages” is set in constructor.


“compressedPages” is set in constructor. then i tried to find what "List<DataPage> compressedPages” is.




It seems to be that when pageHeader.type is not DATA_PAGE  or DATA_PAGE_V2 ,  then the “ compressedPages” is empty list when construct ColumnChunkPageReader.   


I cannot go on tracking so far, i don’t known how to read parquet file head,how to set parquet file head and i don’t known what is wrong. 
Please help to offer some advices. thank you very much!


[org.apache.parquet.hadoop.ParquetFileReader.java]
public ColumnChunkPageReader readAllPages() throws IOException {
 List<DataPage> pagesInChunk = new ArrayList<DataPage>();
 DictionaryPage dictionaryPage = null;
 PrimitiveType type = getFileMetaData().getSchema()
 .getType(descriptor.col.getPath()).asPrimitiveType();
 long valuesCountReadSoFar = 0;
 int dataPageCountReadSoFar = 0;
 while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
 PageHeader pageHeader = readPageHeader();
 int uncompressedPageSize = pageHeader.getUncompressed_page_size();
 int compressedPageSize = pageHeader.getCompressed_page_size();
 final BytesInput pageBytes;
 switch (pageHeader.type) {
 case DICTIONARY_PAGE:
 // there is only one dictionary page per column chunk
 if (dictionaryPage != null) {
 throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
 }
 pageBytes = this.readAsBytesInput(compressedPageSize);
 if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
 verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
 "could not verify dictionary page integrity, CRC checksum verification failed");
 }
 DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
 dictionaryPage =
 new DictionaryPage(
 pageBytes,
 uncompressedPageSize,
 dicHeader.getNum_values(),
 converter.getEncoding(dicHeader.getEncoding())
 );
 // Copy crc to new page, used for testing
 if (pageHeader.isSetCrc()) {
 dictionaryPage.setCrc(pageHeader.getCrc());
 }
 break;
 case DATA_PAGE:
 DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
 pageBytes = this.readAsBytesInput(compressedPageSize);
 if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
 verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
 "could not verify page integrity, CRC checksum verification failed");
 }
 DataPageV1 dataPageV1 = new DataPageV1(
 pageBytes,
 dataHeaderV1.getNum_values(),
 uncompressedPageSize,
 converter.fromParquetStatistics(
 getFileMetaData().getCreatedBy(),
 dataHeaderV1.getStatistics(),
 type),
 converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
 converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
 converter.getEncoding(dataHeaderV1.getEncoding()));
 // Copy crc to new page, used for testing
 if (pageHeader.isSetCrc()) {
 dataPageV1.setCrc(pageHeader.getCrc());
 }
 pagesInChunk.add(dataPageV1);
 valuesCountReadSoFar += dataHeaderV1.getNum_values();
 ++dataPageCountReadSoFar;
 break;
 case DATA_PAGE_V2:
 DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
 int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
 pagesInChunk.add(
 new DataPageV2(
 dataHeaderV2.getNum_rows(),
 dataHeaderV2.getNum_nulls(),
 dataHeaderV2.getNum_values(),
 this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
 this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
 converter.getEncoding(dataHeaderV2.getEncoding()),
 this.readAsBytesInput(dataSize),
 uncompressedPageSize,
 converter.fromParquetStatistics(
 getFileMetaData().getCreatedBy(),
 dataHeaderV2.getStatistics(),
 type),
 dataHeaderV2.isIs_compressed()
 ));
 valuesCountReadSoFar += dataHeaderV2.getNum_values();
 ++dataPageCountReadSoFar;
 break;
 default:
 LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
 stream.skipFully(compressedPageSize);
 break;
 }
 }
 if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
 // Would be nice to have a CorruptParquetFileException or something as a subclass?
 throw new IOException(
 "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
 getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
 " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
 + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
 }
 BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
 return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
 blocks.get(currentBlock).getRowCount());
}


Best Regards!

Re: flink hudi parquet file reader read unknown parquet header type, and throw NPE

Posted by Danny Chan <da...@apache.org>.
Dear vtygoss ~
Thanks for your very detailed information about the trace, i tried to
reproduce the problem with SQL statements:

select _hoodie_commit_time from xxx;
select * from xxx limit 2;

But i'm unlucky and the SQL works as expected, one difference with your
example is that i write the table also using Flink.
BTW, what parquet version did you use when you write the table using spark ?
Flink writer uses parquet 1.11.1 now.

Best,
Danny Chan

Danny Chan <da...@apache.org> 于2021年7月12日周一 上午8:50写道:

> Thanks for the feedback, I will take a look ~
>
> Best,
> Danny Chan
>
> vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:
>
>>
>>
>>  原始邮件
>> *发件人:* vtygoss<vt...@126.com>
>> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
>> user@flink.apache.org>
>> *发送时间:* 2021年7月9日(周五) 16:57
>> *主题:* flink hudi parquet file reader read unknown parquet header type,
>> and throw NPE
>>
>>
>>
>>
>> Hi, comunity!
>>
>>
>> I am new developer in hudi-flink, and i met a problem.
>>
>> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
>> were written by hudi-spark-0.9.0-SNAPSHOT.
>>
>>
>> some scenario
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
>> the result is displayed normally.
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
>> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>>
>> - When I execute statement “select _hoodie_commit_time from
>> ibaehis_ogg_sync_pr_encounter” whether or not  limit number, NPE also
>> happend, NPE info below. But when i project any other column, not NPE
>> happened.
>>
>>
>>
>> ```
>>
>> [create table statment]
>>
>> create table if not exists ibaehis_ogg_sync_pr_encounter (
>>
>> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
>> string,`_hoodie_record_key` string,`_hoodie_partition_path`
>> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
>> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
>> string,`visit_adm_type_facility_id` string,`visit_adm_type`
>> string,`visit_adm_type_ind` string,`assign_care_locn_type`
>> string,`assign_care_locn_code` string,`assign_room_type`
>> string,`assign_room_num` string,`assign_bed_num`
>> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
>> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
>> string,`booking_ref_no` string,`service_code` string,`subservice_code`
>> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
>> string,`circumstance_of_injury_code` string,`ambulatory_status`
>> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
>> string,`new_op_episode_yn` string,`episode_id`
>> string,`op_episode_visit_num` string,`admit_practitioner_id`
>> string,`discharge_date_time` string,`disp_auth_practitioner_id`
>> string,`disposition_type` string,`recall_yn` string,`recall_date`
>> string,`disp_facility_id` string,`disp_referral_id`
>> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
>> string,`shift_id` string,`backdated_yn` string,`visit_status`
>> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
>> string,`visit_status_set_reason` string,`adt_status`
>> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
>> string,`adt_status_set_reason` string,`leave_expiry_date_time`
>> string,`oth_adt_status` string,`ip_leave_status`
>> string,`contact_reason_code` string,`recall_reason`
>> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
>> string,`prn_visit_before` string,`ae_episode_yn`
>> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
>> string,`credit_auth_user_id` string,`credit_auth_remarks`
>> string,`admission_no` string,`deceased_date_time` string,`security_level`
>> string,`protection_ind` string,`assign_bed_class_code`
>> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
>> string,`pol_stn_id` string,`pol_id` string,`informed_to`
>> string,`informed_name` string,`informed_date_time`
>> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
>> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
>> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
>> string,`closed_date` string,`patient_priority_no`
>> string,`assign_bed_type_code` string,`dc_unit_code`
>> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
>> string,`priority_zone` string,`treatment_area_code`
>> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
>> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
>> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
>> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
>> string,`file_no` string,`movement_reason_code` string,`queue_id`
>> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
>> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
>> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
>> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
>> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
>> string,`card_id` string,`expiry_date` string,`privl_type_code`
>> string,`room_tel_ext_num` string,`date_time_of_accident`
>> string,`place_of_accident` string,`mv_accident_yn`
>> string,`reserved_nurs_unit_code` string,`reserved_room_no`
>> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
>> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
>> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
>> string,`modified_date` string,`modified_at_ws_no`
>> string,`modified_facility_id` string,`cancel_checkout_reason_code`
>> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
>> string,`prev_visit_status` string,`prev_practitioner_id`
>> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
>> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
>> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
>> string,`late_disc_reason_code` string,`other_late_disc_reason`
>> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
>> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
>> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
>> string,`vehicle_involved2_code` string,`pat_position_code`
>> string,`protective_device_code` string,`vehicle_reg_no1`
>> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
>> string,`transport_mode` string,`arrival_code`
>> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
>> string,`pre_dis_initiated_user` string,`recalled_enc_id`
>> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
>> string,`other_reason_remarks` string,`revise_visit_remarks`
>> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
>> string,`discharge_to` string,`ts` bigint,PRIMARY
>> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>>
>>
>> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
>> 'table.type'='MERGE_ON_READ',
>>
>>
>> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>>
>>
>> [files in hdfs]
>>
>> ```
>>
>>
>>
>> ```
>>
>> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>>
>>
>> 021-07-09 16:22:54,634 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> split_reader -> SinkConversionToTuple2 (3/4)
>> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,653 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
>> CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> split_reader -> SinkConversionToTuple2 (1/4)
>> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: SQL
>> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
>> switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,655 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
>> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy
>>
>>         at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
>> ~[?:?]
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_74]
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> Caused by: java.lang.NullPointerException
>>
>>         at
>> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>>
>> 2021-07-09 16:22:54,657 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping
>> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>>
>> 2021-07-09 16:22:54,658 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
>> Shutting down
>>
>> ```
>>
>>
>>
>> and i tried to track this exception, it seems like throwing NPE before
>> throwing RuntimeException.
>>
>> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java
>> ]
>>
>>
>>
>>
>> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>>
>>
>> when "compressedPages" is empty, method readPage() will return null. And
>> “compressedPages” is set in constructor.
>>
>>
>> “compressedPages” is set in constructor. then i tried to find what
>> "List<DataPage> compressedPages” is.
>>
>>
>>
>> It seems to be that when pageHeader.type is not DATA_PAGE  or
>> DATA_PAGE_V2 ,  then the “ compressedPages” is empty list when construct
>> ColumnChunkPageReader.
>>
>>
>> I cannot go on tracking so far, i don’t known how to read parquet file
>> head,how to set parquet file head and i don’t known what is wrong.
>>
>> Please help to offer some advices. thank you very much!
>>
>>
>> [org.apache.parquet.hadoop.ParquetFileReader.java]
>>
>> public ColumnChunkPageReader readAllPages() throws IOException {
>>   List<DataPage> pagesInChunk = new ArrayList<DataPage>();
>>   DictionaryPage dictionaryPage = null;
>>   PrimitiveType type = getFileMetaData().getSchema()
>>       .getType(descriptor.col.getPath()).asPrimitiveType();
>>   long valuesCountReadSoFar = 0;
>>   int dataPageCountReadSoFar = 0;
>>   while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
>>     PageHeader pageHeader = readPageHeader();
>>     int uncompressedPageSize = pageHeader.getUncompressed_page_size();
>>     int compressedPageSize = pageHeader.getCompressed_page_size();
>>     final BytesInput pageBytes;
>>     switch (pageHeader.type) {
>>       case DICTIONARY_PAGE:
>>         // there is only one dictionary page per column chunk
>>         if (dictionaryPage != null) {
>>           throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
>>         }
>>         pageBytes = this.readAsBytesInput(compressedPageSize);
>>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>>             "could not verify dictionary page integrity, CRC checksum verification failed");
>>         }
>>         DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
>>         dictionaryPage =
>>             new DictionaryPage(
>>                 pageBytes,
>>                 uncompressedPageSize,
>>                 dicHeader.getNum_values(),
>>                 converter.getEncoding(dicHeader.getEncoding())
>>                 );
>>         // Copy crc to new page, used for testing
>>         if (pageHeader.isSetCrc()) {
>>           dictionaryPage.setCrc(pageHeader.getCrc());
>>         }
>>         break;
>>       case DATA_PAGE:
>>         DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
>>         pageBytes = this.readAsBytesInput(compressedPageSize);
>>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>>             "could not verify page integrity, CRC checksum verification failed");
>>         }
>>         DataPageV1 dataPageV1 = new DataPageV1(
>>           pageBytes,
>>           dataHeaderV1.getNum_values(),
>>           uncompressedPageSize,
>>           converter.fromParquetStatistics(
>>             getFileMetaData().getCreatedBy(),
>>             dataHeaderV1.getStatistics(),
>>             type),
>>           converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
>>           converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
>>           converter.getEncoding(dataHeaderV1.getEncoding()));
>>         // Copy crc to new page, used for testing
>>         if (pageHeader.isSetCrc()) {
>>           dataPageV1.setCrc(pageHeader.getCrc());
>>         }
>>         pagesInChunk.add(dataPageV1);
>>         valuesCountReadSoFar += dataHeaderV1.getNum_values();
>>         ++dataPageCountReadSoFar;
>>         break;
>>       case DATA_PAGE_V2:
>>         DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
>>         int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
>>         pagesInChunk.add(
>>             new DataPageV2(
>>                 dataHeaderV2.getNum_rows(),
>>                 dataHeaderV2.getNum_nulls(),
>>                 dataHeaderV2.getNum_values(),
>>                 this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
>>                 this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
>>                 converter.getEncoding(dataHeaderV2.getEncoding()),
>>                 this.readAsBytesInput(dataSize),
>>                 uncompressedPageSize,
>>                 converter.fromParquetStatistics(
>>                     getFileMetaData().getCreatedBy(),
>>                     dataHeaderV2.getStatistics(),
>>                     type),
>>                 dataHeaderV2.isIs_compressed()
>>                 ));
>>         valuesCountReadSoFar += dataHeaderV2.getNum_values();
>>         ++dataPageCountReadSoFar;
>>         break;
>>       default:
>>         LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
>>         stream.skipFully(compressedPageSize);
>>         break;
>>     }
>>   }
>>   if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
>>     // Would be nice to have a CorruptParquetFileException or something as a subclass?
>>     throw new IOException(
>>         "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
>>         getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
>>         " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
>>         + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
>>   }
>>   BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
>>   return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
>>       blocks.get(currentBlock).getRowCount());
>> }
>>
>>
>> Best Regards!
>>
>

Re: flink hudi parquet file reader read unknown parquet header type, and throw NPE

Posted by Danny Chan <da...@apache.org>.
Dear vtygoss ~
Thanks for your very detailed information about the trace, i tried to
reproduce the problem with SQL statements:

select _hoodie_commit_time from xxx;
select * from xxx limit 2;

But i'm unlucky and the SQL works as expected, one difference with your
example is that i write the table also using Flink.
BTW, what parquet version did you use when you write the table using spark ?
Flink writer uses parquet 1.11.1 now.

Best,
Danny Chan

Danny Chan <da...@apache.org> 于2021年7月12日周一 上午8:50写道:

> Thanks for the feedback, I will take a look ~
>
> Best,
> Danny Chan
>
> vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:
>
>>
>>
>>  原始邮件
>> *发件人:* vtygoss<vt...@126.com>
>> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
>> user@flink.apache.org>
>> *发送时间:* 2021年7月9日(周五) 16:57
>> *主题:* flink hudi parquet file reader read unknown parquet header type,
>> and throw NPE
>>
>>
>>
>>
>> Hi, comunity!
>>
>>
>> I am new developer in hudi-flink, and i met a problem.
>>
>> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
>> were written by hudi-spark-0.9.0-SNAPSHOT.
>>
>>
>> some scenario
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
>> the result is displayed normally.
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
>> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>>
>> - When I execute statement “select _hoodie_commit_time from
>> ibaehis_ogg_sync_pr_encounter” whether or not  limit number, NPE also
>> happend, NPE info below. But when i project any other column, not NPE
>> happened.
>>
>>
>>
>> ```
>>
>> [create table statment]
>>
>> create table if not exists ibaehis_ogg_sync_pr_encounter (
>>
>> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
>> string,`_hoodie_record_key` string,`_hoodie_partition_path`
>> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
>> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
>> string,`visit_adm_type_facility_id` string,`visit_adm_type`
>> string,`visit_adm_type_ind` string,`assign_care_locn_type`
>> string,`assign_care_locn_code` string,`assign_room_type`
>> string,`assign_room_num` string,`assign_bed_num`
>> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
>> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
>> string,`booking_ref_no` string,`service_code` string,`subservice_code`
>> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
>> string,`circumstance_of_injury_code` string,`ambulatory_status`
>> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
>> string,`new_op_episode_yn` string,`episode_id`
>> string,`op_episode_visit_num` string,`admit_practitioner_id`
>> string,`discharge_date_time` string,`disp_auth_practitioner_id`
>> string,`disposition_type` string,`recall_yn` string,`recall_date`
>> string,`disp_facility_id` string,`disp_referral_id`
>> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
>> string,`shift_id` string,`backdated_yn` string,`visit_status`
>> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
>> string,`visit_status_set_reason` string,`adt_status`
>> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
>> string,`adt_status_set_reason` string,`leave_expiry_date_time`
>> string,`oth_adt_status` string,`ip_leave_status`
>> string,`contact_reason_code` string,`recall_reason`
>> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
>> string,`prn_visit_before` string,`ae_episode_yn`
>> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
>> string,`credit_auth_user_id` string,`credit_auth_remarks`
>> string,`admission_no` string,`deceased_date_time` string,`security_level`
>> string,`protection_ind` string,`assign_bed_class_code`
>> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
>> string,`pol_stn_id` string,`pol_id` string,`informed_to`
>> string,`informed_name` string,`informed_date_time`
>> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
>> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
>> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
>> string,`closed_date` string,`patient_priority_no`
>> string,`assign_bed_type_code` string,`dc_unit_code`
>> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
>> string,`priority_zone` string,`treatment_area_code`
>> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
>> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
>> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
>> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
>> string,`file_no` string,`movement_reason_code` string,`queue_id`
>> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
>> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
>> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
>> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
>> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
>> string,`card_id` string,`expiry_date` string,`privl_type_code`
>> string,`room_tel_ext_num` string,`date_time_of_accident`
>> string,`place_of_accident` string,`mv_accident_yn`
>> string,`reserved_nurs_unit_code` string,`reserved_room_no`
>> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
>> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
>> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
>> string,`modified_date` string,`modified_at_ws_no`
>> string,`modified_facility_id` string,`cancel_checkout_reason_code`
>> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
>> string,`prev_visit_status` string,`prev_practitioner_id`
>> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
>> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
>> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
>> string,`late_disc_reason_code` string,`other_late_disc_reason`
>> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
>> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
>> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
>> string,`vehicle_involved2_code` string,`pat_position_code`
>> string,`protective_device_code` string,`vehicle_reg_no1`
>> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
>> string,`transport_mode` string,`arrival_code`
>> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
>> string,`pre_dis_initiated_user` string,`recalled_enc_id`
>> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
>> string,`other_reason_remarks` string,`revise_visit_remarks`
>> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
>> string,`discharge_to` string,`ts` bigint,PRIMARY
>> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>>
>>
>> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
>> 'table.type'='MERGE_ON_READ',
>>
>>
>> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>>
>>
>> [files in hdfs]
>>
>> ```
>>
>>
>>
>> ```
>>
>> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>>
>>
>> 021-07-09 16:22:54,634 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> split_reader -> SinkConversionToTuple2 (3/4)
>> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,653 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
>> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
>> CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
>> split_reader -> SinkConversionToTuple2 (1/4)
>> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: SQL
>> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
>> switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,655 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
>> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
>> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy
>>
>>         at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
>> ~[?:?]
>>
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_74]
>>
>>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> Caused by: java.lang.NullPointerException
>>
>>         at
>> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>>         at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>>
>> 2021-07-09 16:22:54,657 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping
>> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>>
>> 2021-07-09 16:22:54,658 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
>> Shutting down
>>
>> ```
>>
>>
>>
>> and i tried to track this exception, it seems like throwing NPE before
>> throwing RuntimeException.
>>
>> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java
>> ]
>>
>>
>>
>>
>> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>>
>>
>> when "compressedPages" is empty, method readPage() will return null. And
>> “compressedPages” is set in constructor.
>>
>>
>> “compressedPages” is set in constructor. then i tried to find what
>> "List<DataPage> compressedPages” is.
>>
>>
>>
>> It seems to be that when pageHeader.type is not DATA_PAGE  or
>> DATA_PAGE_V2 ,  then the “ compressedPages” is empty list when construct
>> ColumnChunkPageReader.
>>
>>
>> I cannot go on tracking so far, i don’t known how to read parquet file
>> head,how to set parquet file head and i don’t known what is wrong.
>>
>> Please help to offer some advices. thank you very much!
>>
>>
>> [org.apache.parquet.hadoop.ParquetFileReader.java]
>>
>> public ColumnChunkPageReader readAllPages() throws IOException {
>>   List<DataPage> pagesInChunk = new ArrayList<DataPage>();
>>   DictionaryPage dictionaryPage = null;
>>   PrimitiveType type = getFileMetaData().getSchema()
>>       .getType(descriptor.col.getPath()).asPrimitiveType();
>>   long valuesCountReadSoFar = 0;
>>   int dataPageCountReadSoFar = 0;
>>   while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
>>     PageHeader pageHeader = readPageHeader();
>>     int uncompressedPageSize = pageHeader.getUncompressed_page_size();
>>     int compressedPageSize = pageHeader.getCompressed_page_size();
>>     final BytesInput pageBytes;
>>     switch (pageHeader.type) {
>>       case DICTIONARY_PAGE:
>>         // there is only one dictionary page per column chunk
>>         if (dictionaryPage != null) {
>>           throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
>>         }
>>         pageBytes = this.readAsBytesInput(compressedPageSize);
>>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>>             "could not verify dictionary page integrity, CRC checksum verification failed");
>>         }
>>         DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
>>         dictionaryPage =
>>             new DictionaryPage(
>>                 pageBytes,
>>                 uncompressedPageSize,
>>                 dicHeader.getNum_values(),
>>                 converter.getEncoding(dicHeader.getEncoding())
>>                 );
>>         // Copy crc to new page, used for testing
>>         if (pageHeader.isSetCrc()) {
>>           dictionaryPage.setCrc(pageHeader.getCrc());
>>         }
>>         break;
>>       case DATA_PAGE:
>>         DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
>>         pageBytes = this.readAsBytesInput(compressedPageSize);
>>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>>             "could not verify page integrity, CRC checksum verification failed");
>>         }
>>         DataPageV1 dataPageV1 = new DataPageV1(
>>           pageBytes,
>>           dataHeaderV1.getNum_values(),
>>           uncompressedPageSize,
>>           converter.fromParquetStatistics(
>>             getFileMetaData().getCreatedBy(),
>>             dataHeaderV1.getStatistics(),
>>             type),
>>           converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
>>           converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
>>           converter.getEncoding(dataHeaderV1.getEncoding()));
>>         // Copy crc to new page, used for testing
>>         if (pageHeader.isSetCrc()) {
>>           dataPageV1.setCrc(pageHeader.getCrc());
>>         }
>>         pagesInChunk.add(dataPageV1);
>>         valuesCountReadSoFar += dataHeaderV1.getNum_values();
>>         ++dataPageCountReadSoFar;
>>         break;
>>       case DATA_PAGE_V2:
>>         DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
>>         int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
>>         pagesInChunk.add(
>>             new DataPageV2(
>>                 dataHeaderV2.getNum_rows(),
>>                 dataHeaderV2.getNum_nulls(),
>>                 dataHeaderV2.getNum_values(),
>>                 this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
>>                 this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
>>                 converter.getEncoding(dataHeaderV2.getEncoding()),
>>                 this.readAsBytesInput(dataSize),
>>                 uncompressedPageSize,
>>                 converter.fromParquetStatistics(
>>                     getFileMetaData().getCreatedBy(),
>>                     dataHeaderV2.getStatistics(),
>>                     type),
>>                 dataHeaderV2.isIs_compressed()
>>                 ));
>>         valuesCountReadSoFar += dataHeaderV2.getNum_values();
>>         ++dataPageCountReadSoFar;
>>         break;
>>       default:
>>         LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
>>         stream.skipFully(compressedPageSize);
>>         break;
>>     }
>>   }
>>   if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
>>     // Would be nice to have a CorruptParquetFileException or something as a subclass?
>>     throw new IOException(
>>         "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
>>         getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
>>         " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
>>         + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
>>   }
>>   BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
>>   return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
>>       blocks.get(currentBlock).getRowCount());
>> }
>>
>>
>> Best Regards!
>>
>

Re: flink hudi parquet file reader read unknown parquet header type, and throw NPE

Posted by Danny Chan <da...@apache.org>.
Thanks for the feedback, I will take a look ~

Best,
Danny Chan

vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:

>
>
>  原始邮件
> *发件人:* vtygoss<vt...@126.com>
> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
> user@flink.apache.org>
> *发送时间:* 2021年7月9日(周五) 16:57
> *主题:* flink hudi parquet file reader read unknown parquet header type,
> and throw NPE
>
>
>
>
> Hi, comunity!
>
>
> I am new developer in hudi-flink, and i met a problem.
>
> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
> were written by hudi-spark-0.9.0-SNAPSHOT.
>
>
> some scenario
>
> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
> the result is displayed normally.
>
> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>
> - When I execute statement “select _hoodie_commit_time from
> ibaehis_ogg_sync_pr_encounter” whether or not  limit number, NPE also
> happend, NPE info below. But when i project any other column, not NPE
> happened.
>
>
>
> ```
>
> [create table statment]
>
> create table if not exists ibaehis_ogg_sync_pr_encounter (
>
> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
> string,`_hoodie_record_key` string,`_hoodie_partition_path`
> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
> string,`visit_adm_type_facility_id` string,`visit_adm_type`
> string,`visit_adm_type_ind` string,`assign_care_locn_type`
> string,`assign_care_locn_code` string,`assign_room_type`
> string,`assign_room_num` string,`assign_bed_num`
> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
> string,`booking_ref_no` string,`service_code` string,`subservice_code`
> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
> string,`circumstance_of_injury_code` string,`ambulatory_status`
> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
> string,`new_op_episode_yn` string,`episode_id`
> string,`op_episode_visit_num` string,`admit_practitioner_id`
> string,`discharge_date_time` string,`disp_auth_practitioner_id`
> string,`disposition_type` string,`recall_yn` string,`recall_date`
> string,`disp_facility_id` string,`disp_referral_id`
> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
> string,`shift_id` string,`backdated_yn` string,`visit_status`
> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
> string,`visit_status_set_reason` string,`adt_status`
> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
> string,`adt_status_set_reason` string,`leave_expiry_date_time`
> string,`oth_adt_status` string,`ip_leave_status`
> string,`contact_reason_code` string,`recall_reason`
> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
> string,`prn_visit_before` string,`ae_episode_yn`
> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
> string,`credit_auth_user_id` string,`credit_auth_remarks`
> string,`admission_no` string,`deceased_date_time` string,`security_level`
> string,`protection_ind` string,`assign_bed_class_code`
> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
> string,`pol_stn_id` string,`pol_id` string,`informed_to`
> string,`informed_name` string,`informed_date_time`
> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
> string,`closed_date` string,`patient_priority_no`
> string,`assign_bed_type_code` string,`dc_unit_code`
> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
> string,`priority_zone` string,`treatment_area_code`
> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
> string,`file_no` string,`movement_reason_code` string,`queue_id`
> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
> string,`card_id` string,`expiry_date` string,`privl_type_code`
> string,`room_tel_ext_num` string,`date_time_of_accident`
> string,`place_of_accident` string,`mv_accident_yn`
> string,`reserved_nurs_unit_code` string,`reserved_room_no`
> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
> string,`modified_date` string,`modified_at_ws_no`
> string,`modified_facility_id` string,`cancel_checkout_reason_code`
> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
> string,`prev_visit_status` string,`prev_practitioner_id`
> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
> string,`late_disc_reason_code` string,`other_late_disc_reason`
> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
> string,`vehicle_involved2_code` string,`pat_position_code`
> string,`protective_device_code` string,`vehicle_reg_no1`
> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
> string,`transport_mode` string,`arrival_code`
> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
> string,`pre_dis_initiated_user` string,`recalled_enc_id`
> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
> string,`other_reason_remarks` string,`revise_visit_remarks`
> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
> string,`discharge_to` string,`ts` bigint,PRIMARY
> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>
>
> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
> 'table.type'='MERGE_ON_READ',
>
>
> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>
>
> [files in hdfs]
>
> ```
>
>
>
> ```
>
> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>
>
> 021-07-09 16:22:54,634 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> split_reader -> SinkConversionToTuple2 (3/4)
> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,653 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source:
> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
> CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] -
> split_reader -> SinkConversionToTuple2 (1/4)
> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: SQL
> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
> switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,655 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job
> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
> ~[?:?]
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_74]
>
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> Caused by: java.lang.NullPointerException
>
>         at
> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
>         at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>
> 2021-07-09 16:22:54,657 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping
> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>
> 2021-07-09 16:22:54,658 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Shutting down
>
> ```
>
>
>
> and i tried to track this exception, it seems like throwing NPE before
> throwing RuntimeException.
>
> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java]
>
>
>
>
> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>
>
> when "compressedPages" is empty, method readPage() will return null. And
> “compressedPages” is set in constructor.
>
>
> “compressedPages” is set in constructor. then i tried to find what
> "List<DataPage> compressedPages” is.
>
>
>
> It seems to be that when pageHeader.type is not DATA_PAGE  or DATA_PAGE_V2 ,
>  then the “ compressedPages” is empty list when construct
> ColumnChunkPageReader.
>
>
> I cannot go on tracking so far, i don’t known how to read parquet file
> head,how to set parquet file head and i don’t known what is wrong.
>
> Please help to offer some advices. thank you very much!
>
>
> [org.apache.parquet.hadoop.ParquetFileReader.java]
>
> public ColumnChunkPageReader readAllPages() throws IOException {
>   List<DataPage> pagesInChunk = new ArrayList<DataPage>();
>   DictionaryPage dictionaryPage = null;
>   PrimitiveType type = getFileMetaData().getSchema()
>       .getType(descriptor.col.getPath()).asPrimitiveType();
>   long valuesCountReadSoFar = 0;
>   int dataPageCountReadSoFar = 0;
>   while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
>     PageHeader pageHeader = readPageHeader();
>     int uncompressedPageSize = pageHeader.getUncompressed_page_size();
>     int compressedPageSize = pageHeader.getCompressed_page_size();
>     final BytesInput pageBytes;
>     switch (pageHeader.type) {
>       case DICTIONARY_PAGE:
>         // there is only one dictionary page per column chunk
>         if (dictionaryPage != null) {
>           throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
>         }
>         pageBytes = this.readAsBytesInput(compressedPageSize);
>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>             "could not verify dictionary page integrity, CRC checksum verification failed");
>         }
>         DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
>         dictionaryPage =
>             new DictionaryPage(
>                 pageBytes,
>                 uncompressedPageSize,
>                 dicHeader.getNum_values(),
>                 converter.getEncoding(dicHeader.getEncoding())
>                 );
>         // Copy crc to new page, used for testing
>         if (pageHeader.isSetCrc()) {
>           dictionaryPage.setCrc(pageHeader.getCrc());
>         }
>         break;
>       case DATA_PAGE:
>         DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
>         pageBytes = this.readAsBytesInput(compressedPageSize);
>         if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>           verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>             "could not verify page integrity, CRC checksum verification failed");
>         }
>         DataPageV1 dataPageV1 = new DataPageV1(
>           pageBytes,
>           dataHeaderV1.getNum_values(),
>           uncompressedPageSize,
>           converter.fromParquetStatistics(
>             getFileMetaData().getCreatedBy(),
>             dataHeaderV1.getStatistics(),
>             type),
>           converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
>           converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
>           converter.getEncoding(dataHeaderV1.getEncoding()));
>         // Copy crc to new page, used for testing
>         if (pageHeader.isSetCrc()) {
>           dataPageV1.setCrc(pageHeader.getCrc());
>         }
>         pagesInChunk.add(dataPageV1);
>         valuesCountReadSoFar += dataHeaderV1.getNum_values();
>         ++dataPageCountReadSoFar;
>         break;
>       case DATA_PAGE_V2:
>         DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
>         int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
>         pagesInChunk.add(
>             new DataPageV2(
>                 dataHeaderV2.getNum_rows(),
>                 dataHeaderV2.getNum_nulls(),
>                 dataHeaderV2.getNum_values(),
>                 this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
>                 this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
>                 converter.getEncoding(dataHeaderV2.getEncoding()),
>                 this.readAsBytesInput(dataSize),
>                 uncompressedPageSize,
>                 converter.fromParquetStatistics(
>                     getFileMetaData().getCreatedBy(),
>                     dataHeaderV2.getStatistics(),
>                     type),
>                 dataHeaderV2.isIs_compressed()
>                 ));
>         valuesCountReadSoFar += dataHeaderV2.getNum_values();
>         ++dataPageCountReadSoFar;
>         break;
>       default:
>         LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
>         stream.skipFully(compressedPageSize);
>         break;
>     }
>   }
>   if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
>     // Would be nice to have a CorruptParquetFileException or something as a subclass?
>     throw new IOException(
>         "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
>         getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
>         " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
>         + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
>   }
>   BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
>   return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
>       blocks.get(currentBlock).getRowCount());
> }
>
>
> Best Regards!
>