You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2021/07/10 14:11:18 UTC
转发:flink hudi parquet file reader read unknown parquet header type, and throw NPE
原始邮件
发件人: vtygoss<vt...@126.com>
收件人: users-faq@hudi.apache.org; user@flink.apache.org<us...@flink.apache.org>
发送时间: 2021年7月9日(周五) 16:57
主题: flink hudi parquet file reader read unknown parquet header type, and throw NPE
Hi, comunity!
I am new developer in hudi-flink, and i met a problem.
I create a flink table with ‘connector’=‘hudi’, and the hudi format files were written by hudi-spark-0.9.0-SNAPSHOT.
some scenario
- When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”, the result is displayed normally.
- When I execute statement “select * from ibaehis_ogg_sync_pr_encounter limit 10” or “limit 100” and so on, NPE happend. NPE info below.
- When I execute statement “select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter” whether or not limit number, NPE also happend, NPE info below. But when i project any other column, not NPE happened.
```
[create table statment]
create table if not exists ibaehis_ogg_sync_pr_encounter (
`_hoodie_commit_time` string,`_hoodie_commit_seqno` string,`_hoodie_record_key` string,`_hoodie_partition_path` string,`_hoodie_file_name` string,`facility_id` string,`encounter_id` string,`patient_id` string,`patient_class` string,`visit_adm_date_time` string,`visit_adm_type_facility_id` string,`visit_adm_type` string,`visit_adm_type_ind` string,`assign_care_locn_type` string,`assign_care_locn_code` string,`assign_room_type` string,`assign_room_num` string,`assign_bed_num` string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn` string,`appt_id` string,`invitation_no` string,`booking_case_yn` string,`booking_ref_no` string,`service_code` string,`subservice_code` string,`ancillary_yn` string,`specialty_code` string,`patient_type` string,`circumstance_of_injury_code` string,`ambulatory_status` string,`courtesy_code` string,`chief_complaint` string,`patient_condition` string,`new_op_episode_yn` string,`episode_id` string,`op_episode_visit_num` string,`admit_practitioner_id` string,`discharge_date_time` string,`disp_auth_practitioner_id` string,`disposition_type` string,`recall_yn` string,`recall_date` string,`disp_facility_id` string,`disp_referral_id` string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period` string,`shift_id` string,`backdated_yn` string,`visit_status` string,`visit_status_set_on_date` string,`visit_status_set_by_user` string,`visit_status_set_reason` string,`adt_status` string,`adt_status_set_on_date` string,`adt_status_set_by_user` string,`adt_status_set_reason` string,`leave_expiry_date_time` string,`oth_adt_status` string,`ip_leave_status` string,`contact_reason_code` string,`recall_reason` string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn` string,`prn_visit_before` string,`ae_episode_yn` string,`revise_reason_code` string,`cancel_reason_code` string,`team_id` string,`credit_auth_user_id` string,`credit_auth_remarks` string,`admission_no` string,`deceased_date_time` string,`security_level` string,`protection_ind` string,`assign_bed_class_code` string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no` string,`pol_stn_id` string,`pol_id` string,`informed_to` string,`informed_name` string,`informed_date_time` string,`post_mortem_req_yn` string,`pat_curr_locn_type` string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn` string,`marked_by_id` string,`marked_date` string,`closed_by_id` string,`closed_date` string,`patient_priority_no` string,`assign_bed_type_code` string,`dc_unit_code` string,`bed_allocation_date_time` string,`mark_arrival_date_time` string,`priority_zone` string,`treatment_area_code` string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code` string,`prev_assign_care_locn_code` string,`prev_assign_room_num` string,`prev_subservice_code` string,`prev_attend_practitioner_id` string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind` string,`file_no` string,`movement_reason_code` string,`queue_id` string,`other_res_class` string,`other_resource_id` string,`pm_yn` string,`medical_yn` string,`surgical_yn` string,`body_release_date_time` string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn` string,`mech_injury_catg_code` string,`mech_injury_subcatg_code` string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub` string,`card_id` string,`expiry_date` string,`privl_type_code` string,`room_tel_ext_num` string,`date_time_of_accident` string,`place_of_accident` string,`mv_accident_yn` string,`reserved_nurs_unit_code` string,`reserved_room_no` string,`reserved_bed_no` string,`cancel_disch_practitioner_id` string,`cancel_disch_date` string,`added_by_id` string,`added_date` string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id` string,`modified_date` string,`modified_at_ws_no` string,`modified_facility_id` string,`cancel_checkout_reason_code` string,`cancel_checkout_by_id` string,`cancel_checkout_remarks` string,`prev_visit_status` string,`prev_practitioner_id` string,`prev_brought_dead_yn` string,`prev_deceased_date_time` string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code` string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks` string,`late_disc_reason_code` string,`other_late_disc_reason` string,`injury_date` string,`trmt_start_date` string,`trmt_end_date` string,`did_yn` string,`prev_did_yn` string,`weight_on_admission` string,`weight_on_admission_unit` string,`vehicle_involved1_code` string,`vehicle_involved2_code` string,`pat_position_code` string,`protective_device_code` string,`vehicle_reg_no1` string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code` string,`transport_mode` string,`arrival_code` string,`pre_dis_initiated_date_time` string,`accompanied_by_code` string,`pre_dis_initiated_user` string,`recalled_enc_id` string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn` string,`other_reason_remarks` string,`revise_visit_remarks` string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi` string,`discharge_to` string,`ts` bigint,PRIMARY KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter','table.type'='MERGE_ON_READ',
'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
[files in hdfs]
```
```
[NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
021-07-09 16:22:54,634 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - split_reader -> SinkConversionToTuple2 (3/4) (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,653 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,654 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - split_reader -> SinkConversionToTuple2 (1/4) (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,654 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: SQL Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865) switched from CANCELING to CANCELED.
2021-07-09 16:22:54,655 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_74]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
Caused by: java.lang.NullPointerException
at org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161) ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
2021-07-09 16:22:54,657 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
2021-07-09 16:22:54,658 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Shutting down
```
and i tried to track this exception, it seems like throwing NPE before throwing RuntimeException.
[org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java]
[org.apache.parquet.hadoop.ColumnChunkPageReader.java]
when "compressedPages" is empty, method readPage() will return null. And “compressedPages” is set in constructor.
“compressedPages” is set in constructor. then i tried to find what "List<DataPage> compressedPages” is.
It seems to be that when pageHeader.type is not DATA_PAGE or DATA_PAGE_V2 , then the “ compressedPages” is empty list when construct ColumnChunkPageReader.
I cannot go on tracking so far, i don’t known how to read parquet file head,how to set parquet file head and i don’t known what is wrong.
Please help to offer some advices. thank you very much!
[org.apache.parquet.hadoop.ParquetFileReader.java]
public ColumnChunkPageReader readAllPages() throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
DictionaryPage dictionaryPage = null;
PrimitiveType type = getFileMetaData().getSchema()
.getType(descriptor.col.getPath()).asPrimitiveType();
long valuesCountReadSoFar = 0;
int dataPageCountReadSoFar = 0;
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
PageHeader pageHeader = readPageHeader();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
final BytesInput pageBytes;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
}
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
"could not verify dictionary page integrity, CRC checksum verification failed");
}
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
dictionaryPage =
new DictionaryPage(
pageBytes,
uncompressedPageSize,
dicHeader.getNum_values(),
converter.getEncoding(dicHeader.getEncoding())
);
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dictionaryPage.setCrc(pageHeader.getCrc());
}
break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
pageBytes = this.readAsBytesInput(compressedPageSize);
if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
"could not verify page integrity, CRC checksum verification failed");
}
DataPageV1 dataPageV1 = new DataPageV1(
pageBytes,
dataHeaderV1.getNum_values(),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV1.getStatistics(),
type),
converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
converter.getEncoding(dataHeaderV1.getEncoding()));
// Copy crc to new page, used for testing
if (pageHeader.isSetCrc()) {
dataPageV1.setCrc(pageHeader.getCrc());
}
pagesInChunk.add(dataPageV1);
valuesCountReadSoFar += dataHeaderV1.getNum_values();
++dataPageCountReadSoFar;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
pagesInChunk.add(
new DataPageV2(
dataHeaderV2.getNum_rows(),
dataHeaderV2.getNum_nulls(),
dataHeaderV2.getNum_values(),
this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
converter.getEncoding(dataHeaderV2.getEncoding()),
this.readAsBytesInput(dataSize),
uncompressedPageSize,
converter.fromParquetStatistics(
getFileMetaData().getCreatedBy(),
dataHeaderV2.getStatistics(),
type),
dataHeaderV2.isIs_compressed()
));
valuesCountReadSoFar += dataHeaderV2.getNum_values();
++dataPageCountReadSoFar;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
stream.skipFully(compressedPageSize);
break;
}
}
if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
// Would be nice to have a CorruptParquetFileException or something as a subclass?
throw new IOException(
"Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
" but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+ " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
}
BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
blocks.get(currentBlock).getRowCount());
}
Best Regards!
Re: flink hudi parquet file reader read unknown parquet header type,
and throw NPE
Posted by Danny Chan <da...@apache.org>.
Dear vtygoss ~
Thanks for your very detailed information about the trace, i tried to
reproduce the problem with SQL statements:
select _hoodie_commit_time from xxx;
select * from xxx limit 2;
But i'm unlucky and the SQL works as expected, one difference with your
example is that i write the table also using Flink.
BTW, what parquet version did you use when you write the table using spark ?
Flink writer uses parquet 1.11.1 now.
Best,
Danny Chan
Danny Chan <da...@apache.org> 于2021年7月12日周一 上午8:50写道:
> Thanks for the feedback, I will take a look ~
>
> Best,
> Danny Chan
>
> vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:
>
>>
>>
>> 原始邮件
>> *发件人:* vtygoss<vt...@126.com>
>> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
>> user@flink.apache.org>
>> *发送时间:* 2021年7月9日(周五) 16:57
>> *主题:* flink hudi parquet file reader read unknown parquet header type,
>> and throw NPE
>>
>>
>>
>>
>> Hi, comunity!
>>
>>
>> I am new developer in hudi-flink, and i met a problem.
>>
>> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
>> were written by hudi-spark-0.9.0-SNAPSHOT.
>>
>>
>> some scenario
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
>> the result is displayed normally.
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
>> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>>
>> - When I execute statement “select _hoodie_commit_time from
>> ibaehis_ogg_sync_pr_encounter” whether or not limit number, NPE also
>> happend, NPE info below. But when i project any other column, not NPE
>> happened.
>>
>>
>>
>> ```
>>
>> [create table statment]
>>
>> create table if not exists ibaehis_ogg_sync_pr_encounter (
>>
>> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
>> string,`_hoodie_record_key` string,`_hoodie_partition_path`
>> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
>> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
>> string,`visit_adm_type_facility_id` string,`visit_adm_type`
>> string,`visit_adm_type_ind` string,`assign_care_locn_type`
>> string,`assign_care_locn_code` string,`assign_room_type`
>> string,`assign_room_num` string,`assign_bed_num`
>> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
>> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
>> string,`booking_ref_no` string,`service_code` string,`subservice_code`
>> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
>> string,`circumstance_of_injury_code` string,`ambulatory_status`
>> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
>> string,`new_op_episode_yn` string,`episode_id`
>> string,`op_episode_visit_num` string,`admit_practitioner_id`
>> string,`discharge_date_time` string,`disp_auth_practitioner_id`
>> string,`disposition_type` string,`recall_yn` string,`recall_date`
>> string,`disp_facility_id` string,`disp_referral_id`
>> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
>> string,`shift_id` string,`backdated_yn` string,`visit_status`
>> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
>> string,`visit_status_set_reason` string,`adt_status`
>> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
>> string,`adt_status_set_reason` string,`leave_expiry_date_time`
>> string,`oth_adt_status` string,`ip_leave_status`
>> string,`contact_reason_code` string,`recall_reason`
>> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
>> string,`prn_visit_before` string,`ae_episode_yn`
>> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
>> string,`credit_auth_user_id` string,`credit_auth_remarks`
>> string,`admission_no` string,`deceased_date_time` string,`security_level`
>> string,`protection_ind` string,`assign_bed_class_code`
>> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
>> string,`pol_stn_id` string,`pol_id` string,`informed_to`
>> string,`informed_name` string,`informed_date_time`
>> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
>> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
>> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
>> string,`closed_date` string,`patient_priority_no`
>> string,`assign_bed_type_code` string,`dc_unit_code`
>> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
>> string,`priority_zone` string,`treatment_area_code`
>> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
>> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
>> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
>> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
>> string,`file_no` string,`movement_reason_code` string,`queue_id`
>> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
>> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
>> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
>> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
>> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
>> string,`card_id` string,`expiry_date` string,`privl_type_code`
>> string,`room_tel_ext_num` string,`date_time_of_accident`
>> string,`place_of_accident` string,`mv_accident_yn`
>> string,`reserved_nurs_unit_code` string,`reserved_room_no`
>> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
>> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
>> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
>> string,`modified_date` string,`modified_at_ws_no`
>> string,`modified_facility_id` string,`cancel_checkout_reason_code`
>> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
>> string,`prev_visit_status` string,`prev_practitioner_id`
>> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
>> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
>> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
>> string,`late_disc_reason_code` string,`other_late_disc_reason`
>> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
>> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
>> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
>> string,`vehicle_involved2_code` string,`pat_position_code`
>> string,`protective_device_code` string,`vehicle_reg_no1`
>> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
>> string,`transport_mode` string,`arrival_code`
>> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
>> string,`pre_dis_initiated_user` string,`recalled_enc_id`
>> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
>> string,`other_reason_remarks` string,`revise_visit_remarks`
>> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
>> string,`discharge_to` string,`ts` bigint,PRIMARY
>> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>>
>>
>> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
>> 'table.type'='MERGE_ON_READ',
>>
>>
>> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>>
>>
>> [files in hdfs]
>>
>> ```
>>
>>
>>
>> ```
>>
>> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>>
>>
>> 021-07-09 16:22:54,634 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> split_reader -> SinkConversionToTuple2 (3/4)
>> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,653 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
>> CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> split_reader -> SinkConversionToTuple2 (1/4)
>> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: SQL
>> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
>> switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,655 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
>> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
>> ~[?:?]
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_74]
>>
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> Caused by: java.lang.NullPointerException
>>
>> at
>> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>>
>> 2021-07-09 16:22:54,657 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
>> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>>
>> 2021-07-09 16:22:54,658 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
>> Shutting down
>>
>> ```
>>
>>
>>
>> and i tried to track this exception, it seems like throwing NPE before
>> throwing RuntimeException.
>>
>> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java
>> ]
>>
>>
>>
>>
>> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>>
>>
>> when "compressedPages" is empty, method readPage() will return null. And
>> “compressedPages” is set in constructor.
>>
>>
>> “compressedPages” is set in constructor. then i tried to find what
>> "List<DataPage> compressedPages” is.
>>
>>
>>
>> It seems to be that when pageHeader.type is not DATA_PAGE or
>> DATA_PAGE_V2 , then the “ compressedPages” is empty list when construct
>> ColumnChunkPageReader.
>>
>>
>> I cannot go on tracking so far, i don’t known how to read parquet file
>> head,how to set parquet file head and i don’t known what is wrong.
>>
>> Please help to offer some advices. thank you very much!
>>
>>
>> [org.apache.parquet.hadoop.ParquetFileReader.java]
>>
>> public ColumnChunkPageReader readAllPages() throws IOException {
>> List<DataPage> pagesInChunk = new ArrayList<DataPage>();
>> DictionaryPage dictionaryPage = null;
>> PrimitiveType type = getFileMetaData().getSchema()
>> .getType(descriptor.col.getPath()).asPrimitiveType();
>> long valuesCountReadSoFar = 0;
>> int dataPageCountReadSoFar = 0;
>> while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
>> PageHeader pageHeader = readPageHeader();
>> int uncompressedPageSize = pageHeader.getUncompressed_page_size();
>> int compressedPageSize = pageHeader.getCompressed_page_size();
>> final BytesInput pageBytes;
>> switch (pageHeader.type) {
>> case DICTIONARY_PAGE:
>> // there is only one dictionary page per column chunk
>> if (dictionaryPage != null) {
>> throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
>> }
>> pageBytes = this.readAsBytesInput(compressedPageSize);
>> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>> "could not verify dictionary page integrity, CRC checksum verification failed");
>> }
>> DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
>> dictionaryPage =
>> new DictionaryPage(
>> pageBytes,
>> uncompressedPageSize,
>> dicHeader.getNum_values(),
>> converter.getEncoding(dicHeader.getEncoding())
>> );
>> // Copy crc to new page, used for testing
>> if (pageHeader.isSetCrc()) {
>> dictionaryPage.setCrc(pageHeader.getCrc());
>> }
>> break;
>> case DATA_PAGE:
>> DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
>> pageBytes = this.readAsBytesInput(compressedPageSize);
>> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>> "could not verify page integrity, CRC checksum verification failed");
>> }
>> DataPageV1 dataPageV1 = new DataPageV1(
>> pageBytes,
>> dataHeaderV1.getNum_values(),
>> uncompressedPageSize,
>> converter.fromParquetStatistics(
>> getFileMetaData().getCreatedBy(),
>> dataHeaderV1.getStatistics(),
>> type),
>> converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
>> converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
>> converter.getEncoding(dataHeaderV1.getEncoding()));
>> // Copy crc to new page, used for testing
>> if (pageHeader.isSetCrc()) {
>> dataPageV1.setCrc(pageHeader.getCrc());
>> }
>> pagesInChunk.add(dataPageV1);
>> valuesCountReadSoFar += dataHeaderV1.getNum_values();
>> ++dataPageCountReadSoFar;
>> break;
>> case DATA_PAGE_V2:
>> DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
>> int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
>> pagesInChunk.add(
>> new DataPageV2(
>> dataHeaderV2.getNum_rows(),
>> dataHeaderV2.getNum_nulls(),
>> dataHeaderV2.getNum_values(),
>> this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
>> this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
>> converter.getEncoding(dataHeaderV2.getEncoding()),
>> this.readAsBytesInput(dataSize),
>> uncompressedPageSize,
>> converter.fromParquetStatistics(
>> getFileMetaData().getCreatedBy(),
>> dataHeaderV2.getStatistics(),
>> type),
>> dataHeaderV2.isIs_compressed()
>> ));
>> valuesCountReadSoFar += dataHeaderV2.getNum_values();
>> ++dataPageCountReadSoFar;
>> break;
>> default:
>> LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
>> stream.skipFully(compressedPageSize);
>> break;
>> }
>> }
>> if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
>> // Would be nice to have a CorruptParquetFileException or something as a subclass?
>> throw new IOException(
>> "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
>> getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
>> " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
>> + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
>> }
>> BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
>> return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
>> blocks.get(currentBlock).getRowCount());
>> }
>>
>>
>> Best Regards!
>>
>
Re: flink hudi parquet file reader read unknown parquet header type,
and throw NPE
Posted by Danny Chan <da...@apache.org>.
Dear vtygoss ~
Thanks for your very detailed information about the trace, i tried to
reproduce the problem with SQL statements:
select _hoodie_commit_time from xxx;
select * from xxx limit 2;
But i'm unlucky and the SQL works as expected, one difference with your
example is that i write the table also using Flink.
BTW, what parquet version did you use when you write the table using spark ?
Flink writer uses parquet 1.11.1 now.
Best,
Danny Chan
Danny Chan <da...@apache.org> 于2021年7月12日周一 上午8:50写道:
> Thanks for the feedback, I will take a look ~
>
> Best,
> Danny Chan
>
> vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:
>
>>
>>
>> 原始邮件
>> *发件人:* vtygoss<vt...@126.com>
>> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
>> user@flink.apache.org>
>> *发送时间:* 2021年7月9日(周五) 16:57
>> *主题:* flink hudi parquet file reader read unknown parquet header type,
>> and throw NPE
>>
>>
>>
>>
>> Hi, comunity!
>>
>>
>> I am new developer in hudi-flink, and i met a problem.
>>
>> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
>> were written by hudi-spark-0.9.0-SNAPSHOT.
>>
>>
>> some scenario
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
>> the result is displayed normally.
>>
>> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
>> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>>
>> - When I execute statement “select _hoodie_commit_time from
>> ibaehis_ogg_sync_pr_encounter” whether or not limit number, NPE also
>> happend, NPE info below. But when i project any other column, not NPE
>> happened.
>>
>>
>>
>> ```
>>
>> [create table statment]
>>
>> create table if not exists ibaehis_ogg_sync_pr_encounter (
>>
>> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
>> string,`_hoodie_record_key` string,`_hoodie_partition_path`
>> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
>> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
>> string,`visit_adm_type_facility_id` string,`visit_adm_type`
>> string,`visit_adm_type_ind` string,`assign_care_locn_type`
>> string,`assign_care_locn_code` string,`assign_room_type`
>> string,`assign_room_num` string,`assign_bed_num`
>> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
>> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
>> string,`booking_ref_no` string,`service_code` string,`subservice_code`
>> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
>> string,`circumstance_of_injury_code` string,`ambulatory_status`
>> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
>> string,`new_op_episode_yn` string,`episode_id`
>> string,`op_episode_visit_num` string,`admit_practitioner_id`
>> string,`discharge_date_time` string,`disp_auth_practitioner_id`
>> string,`disposition_type` string,`recall_yn` string,`recall_date`
>> string,`disp_facility_id` string,`disp_referral_id`
>> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
>> string,`shift_id` string,`backdated_yn` string,`visit_status`
>> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
>> string,`visit_status_set_reason` string,`adt_status`
>> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
>> string,`adt_status_set_reason` string,`leave_expiry_date_time`
>> string,`oth_adt_status` string,`ip_leave_status`
>> string,`contact_reason_code` string,`recall_reason`
>> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
>> string,`prn_visit_before` string,`ae_episode_yn`
>> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
>> string,`credit_auth_user_id` string,`credit_auth_remarks`
>> string,`admission_no` string,`deceased_date_time` string,`security_level`
>> string,`protection_ind` string,`assign_bed_class_code`
>> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
>> string,`pol_stn_id` string,`pol_id` string,`informed_to`
>> string,`informed_name` string,`informed_date_time`
>> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
>> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
>> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
>> string,`closed_date` string,`patient_priority_no`
>> string,`assign_bed_type_code` string,`dc_unit_code`
>> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
>> string,`priority_zone` string,`treatment_area_code`
>> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
>> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
>> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
>> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
>> string,`file_no` string,`movement_reason_code` string,`queue_id`
>> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
>> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
>> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
>> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
>> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
>> string,`card_id` string,`expiry_date` string,`privl_type_code`
>> string,`room_tel_ext_num` string,`date_time_of_accident`
>> string,`place_of_accident` string,`mv_accident_yn`
>> string,`reserved_nurs_unit_code` string,`reserved_room_no`
>> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
>> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
>> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
>> string,`modified_date` string,`modified_at_ws_no`
>> string,`modified_facility_id` string,`cancel_checkout_reason_code`
>> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
>> string,`prev_visit_status` string,`prev_practitioner_id`
>> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
>> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
>> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
>> string,`late_disc_reason_code` string,`other_late_disc_reason`
>> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
>> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
>> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
>> string,`vehicle_involved2_code` string,`pat_position_code`
>> string,`protective_device_code` string,`vehicle_reg_no1`
>> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
>> string,`transport_mode` string,`arrival_code`
>> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
>> string,`pre_dis_initiated_user` string,`recalled_enc_id`
>> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
>> string,`other_reason_remarks` string,`revise_visit_remarks`
>> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
>> string,`discharge_to` string,`ts` bigint,PRIMARY
>> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>>
>>
>> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
>> 'table.type'='MERGE_ON_READ',
>>
>>
>> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>>
>>
>> [files in hdfs]
>>
>> ```
>>
>>
>>
>> ```
>>
>> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>>
>>
>> 021-07-09 16:22:54,634 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> split_reader -> SinkConversionToTuple2 (3/4)
>> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,653 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
>> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
>> CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
>> split_reader -> SinkConversionToTuple2 (1/4)
>> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,654 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: SQL
>> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
>> switched from CANCELING to CANCELED.
>>
>> 2021-07-09 16:22:54,655 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
>> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
>> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>>
>> org.apache.flink.runtime.JobException: Recovery is suppressed by
>> NoRestartBackoffTimeStrategy
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
>> ~[?:?]
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_74]
>>
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> Caused by: java.lang.NullPointerException
>>
>> at
>> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
>> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>>
>> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>>
>> 2021-07-09 16:22:54,657 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
>> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>>
>> 2021-07-09 16:22:54,658 INFO
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
>> Shutting down
>>
>> ```
>>
>>
>>
>> and i tried to track this exception, it seems like throwing NPE before
>> throwing RuntimeException.
>>
>> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java
>> ]
>>
>>
>>
>>
>> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>>
>>
>> when "compressedPages" is empty, method readPage() will return null. And
>> “compressedPages” is set in constructor.
>>
>>
>> “compressedPages” is set in constructor. then i tried to find what
>> "List<DataPage> compressedPages” is.
>>
>>
>>
>> It seems to be that when pageHeader.type is not DATA_PAGE or
>> DATA_PAGE_V2 , then the “ compressedPages” is empty list when construct
>> ColumnChunkPageReader.
>>
>>
>> I cannot go on tracking so far, i don’t known how to read parquet file
>> head,how to set parquet file head and i don’t known what is wrong.
>>
>> Please help to offer some advices. thank you very much!
>>
>>
>> [org.apache.parquet.hadoop.ParquetFileReader.java]
>>
>> public ColumnChunkPageReader readAllPages() throws IOException {
>> List<DataPage> pagesInChunk = new ArrayList<DataPage>();
>> DictionaryPage dictionaryPage = null;
>> PrimitiveType type = getFileMetaData().getSchema()
>> .getType(descriptor.col.getPath()).asPrimitiveType();
>> long valuesCountReadSoFar = 0;
>> int dataPageCountReadSoFar = 0;
>> while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
>> PageHeader pageHeader = readPageHeader();
>> int uncompressedPageSize = pageHeader.getUncompressed_page_size();
>> int compressedPageSize = pageHeader.getCompressed_page_size();
>> final BytesInput pageBytes;
>> switch (pageHeader.type) {
>> case DICTIONARY_PAGE:
>> // there is only one dictionary page per column chunk
>> if (dictionaryPage != null) {
>> throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
>> }
>> pageBytes = this.readAsBytesInput(compressedPageSize);
>> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>> "could not verify dictionary page integrity, CRC checksum verification failed");
>> }
>> DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
>> dictionaryPage =
>> new DictionaryPage(
>> pageBytes,
>> uncompressedPageSize,
>> dicHeader.getNum_values(),
>> converter.getEncoding(dicHeader.getEncoding())
>> );
>> // Copy crc to new page, used for testing
>> if (pageHeader.isSetCrc()) {
>> dictionaryPage.setCrc(pageHeader.getCrc());
>> }
>> break;
>> case DATA_PAGE:
>> DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
>> pageBytes = this.readAsBytesInput(compressedPageSize);
>> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
>> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
>> "could not verify page integrity, CRC checksum verification failed");
>> }
>> DataPageV1 dataPageV1 = new DataPageV1(
>> pageBytes,
>> dataHeaderV1.getNum_values(),
>> uncompressedPageSize,
>> converter.fromParquetStatistics(
>> getFileMetaData().getCreatedBy(),
>> dataHeaderV1.getStatistics(),
>> type),
>> converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
>> converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
>> converter.getEncoding(dataHeaderV1.getEncoding()));
>> // Copy crc to new page, used for testing
>> if (pageHeader.isSetCrc()) {
>> dataPageV1.setCrc(pageHeader.getCrc());
>> }
>> pagesInChunk.add(dataPageV1);
>> valuesCountReadSoFar += dataHeaderV1.getNum_values();
>> ++dataPageCountReadSoFar;
>> break;
>> case DATA_PAGE_V2:
>> DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
>> int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
>> pagesInChunk.add(
>> new DataPageV2(
>> dataHeaderV2.getNum_rows(),
>> dataHeaderV2.getNum_nulls(),
>> dataHeaderV2.getNum_values(),
>> this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
>> this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
>> converter.getEncoding(dataHeaderV2.getEncoding()),
>> this.readAsBytesInput(dataSize),
>> uncompressedPageSize,
>> converter.fromParquetStatistics(
>> getFileMetaData().getCreatedBy(),
>> dataHeaderV2.getStatistics(),
>> type),
>> dataHeaderV2.isIs_compressed()
>> ));
>> valuesCountReadSoFar += dataHeaderV2.getNum_values();
>> ++dataPageCountReadSoFar;
>> break;
>> default:
>> LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
>> stream.skipFully(compressedPageSize);
>> break;
>> }
>> }
>> if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
>> // Would be nice to have a CorruptParquetFileException or something as a subclass?
>> throw new IOException(
>> "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
>> getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
>> " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
>> + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
>> }
>> BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
>> return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
>> blocks.get(currentBlock).getRowCount());
>> }
>>
>>
>> Best Regards!
>>
>
Re: flink hudi parquet file reader read unknown parquet header type,
and throw NPE
Posted by Danny Chan <da...@apache.org>.
Thanks for the feedback, I will take a look ~
Best,
Danny Chan
vtygoss <vt...@126.com>于2021年7月10日 周六下午10:11写道:
>
>
> 原始邮件
> *发件人:* vtygoss<vt...@126.com>
> *收件人:* users-faq@hudi.apache.org; user@flink.apache.org<
> user@flink.apache.org>
> *发送时间:* 2021年7月9日(周五) 16:57
> *主题:* flink hudi parquet file reader read unknown parquet header type,
> and throw NPE
>
>
>
>
> Hi, comunity!
>
>
> I am new developer in hudi-flink, and i met a problem.
>
> I create a flink table with ‘connector’=‘hudi’, and the hudi format files
> were written by hudi-spark-0.9.0-SNAPSHOT.
>
>
> some scenario
>
> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter”,
> the result is displayed normally.
>
> - When I execute statement “select * from ibaehis_ogg_sync_pr_encounter *limit
> 10*” or “limit 100” and so on, NPE happend. NPE info below.
>
> - When I execute statement “select _hoodie_commit_time from
> ibaehis_ogg_sync_pr_encounter” whether or not limit number, NPE also
> happend, NPE info below. But when i project any other column, not NPE
> happened.
>
>
>
> ```
>
> [create table statment]
>
> create table if not exists ibaehis_ogg_sync_pr_encounter (
>
> `_hoodie_commit_time` string,`_hoodie_commit_seqno`
> string,`_hoodie_record_key` string,`_hoodie_partition_path`
> string,`_hoodie_file_name` string,`facility_id` string,`encounter_id`
> string,`patient_id` string,`patient_class` string,`visit_adm_date_time`
> string,`visit_adm_type_facility_id` string,`visit_adm_type`
> string,`visit_adm_type_ind` string,`assign_care_locn_type`
> string,`assign_care_locn_code` string,`assign_room_type`
> string,`assign_room_num` string,`assign_bed_num`
> string,`attend_practitioner_id` string,`referral_id` string,`appt_case_yn`
> string,`appt_id` string,`invitation_no` string,`booking_case_yn`
> string,`booking_ref_no` string,`service_code` string,`subservice_code`
> string,`ancillary_yn` string,`specialty_code` string,`patient_type`
> string,`circumstance_of_injury_code` string,`ambulatory_status`
> string,`courtesy_code` string,`chief_complaint` string,`patient_condition`
> string,`new_op_episode_yn` string,`episode_id`
> string,`op_episode_visit_num` string,`admit_practitioner_id`
> string,`discharge_date_time` string,`disp_auth_practitioner_id`
> string,`disposition_type` string,`recall_yn` string,`recall_date`
> string,`disp_facility_id` string,`disp_referral_id`
> string,`mds_complete_yn` string,`fiscal_year` string,`fiscal_period`
> string,`shift_id` string,`backdated_yn` string,`visit_status`
> string,`visit_status_set_on_date` string,`visit_status_set_by_user`
> string,`visit_status_set_reason` string,`adt_status`
> string,`adt_status_set_on_date` string,`adt_status_set_by_user`
> string,`adt_status_set_reason` string,`leave_expiry_date_time`
> string,`oth_adt_status` string,`ip_leave_status`
> string,`contact_reason_code` string,`recall_reason`
> string,`recall_reason_code` string,`referred_yn` string,`prn_visit_yn`
> string,`prn_visit_before` string,`ae_episode_yn`
> string,`revise_reason_code` string,`cancel_reason_code` string,`team_id`
> string,`credit_auth_user_id` string,`credit_auth_remarks`
> string,`admission_no` string,`deceased_date_time` string,`security_level`
> string,`protection_ind` string,`assign_bed_class_code`
> string,`disch_practitioner_id` string,`mlc_yn` string,`pol_rep_no`
> string,`pol_stn_id` string,`pol_id` string,`informed_to`
> string,`informed_name` string,`informed_date_time`
> string,`post_mortem_req_yn` string,`pat_curr_locn_type`
> string,`pat_curr_locn_code` string,`pat_trn_time` string,`oscc_yn`
> string,`marked_by_id` string,`marked_date` string,`closed_by_id`
> string,`closed_date` string,`patient_priority_no`
> string,`assign_bed_type_code` string,`dc_unit_code`
> string,`bed_allocation_date_time` string,`mark_arrival_date_time`
> string,`priority_zone` string,`treatment_area_code`
> string,`brought_dead_yn` string,`disaster_yn` string,`disaster_town_code`
> string,`prev_assign_care_locn_code` string,`prev_assign_room_num`
> string,`prev_subservice_code` string,`prev_attend_practitioner_id`
> string,`prev_visit_adm_type` string,`prev_visit_adm_type_ind`
> string,`file_no` string,`movement_reason_code` string,`queue_id`
> string,`other_res_class` string,`other_resource_id` string,`pm_yn`
> string,`medical_yn` string,`surgical_yn` string,`body_release_date_time`
> string,`disaster_type_code` string,`high_risk_yn` string,`trauma_yn`
> string,`mech_injury_catg_code` string,`mech_injury_subcatg_code`
> string,`exp_discharge_date_time` string,`hosp_main` string,`hosp_sub`
> string,`card_id` string,`expiry_date` string,`privl_type_code`
> string,`room_tel_ext_num` string,`date_time_of_accident`
> string,`place_of_accident` string,`mv_accident_yn`
> string,`reserved_nurs_unit_code` string,`reserved_room_no`
> string,`reserved_bed_no` string,`cancel_disch_practitioner_id`
> string,`cancel_disch_date` string,`added_by_id` string,`added_date`
> string,`added_at_ws_no` string,`added_facility_id` string,`modified_by_id`
> string,`modified_date` string,`modified_at_ws_no`
> string,`modified_facility_id` string,`cancel_checkout_reason_code`
> string,`cancel_checkout_by_id` string,`cancel_checkout_remarks`
> string,`prev_visit_status` string,`prev_practitioner_id`
> string,`prev_brought_dead_yn` string,`prev_deceased_date_time`
> string,`prev_mlc_yn` string,`prev_pm_yn` string,`discharge_status_code`
> string,`complaint_code` string,`enc_modified_yn` string,`admission_remarks`
> string,`late_disc_reason_code` string,`other_late_disc_reason`
> string,`injury_date` string,`trmt_start_date` string,`trmt_end_date`
> string,`did_yn` string,`prev_did_yn` string,`weight_on_admission`
> string,`weight_on_admission_unit` string,`vehicle_involved1_code`
> string,`vehicle_involved2_code` string,`pat_position_code`
> string,`protective_device_code` string,`vehicle_reg_no1`
> string,`vehicle_reg_no2` string,`form60_yn` string,`referred_locn_code`
> string,`transport_mode` string,`arrival_code`
> string,`pre_dis_initiated_date_time` string,`accompanied_by_code`
> string,`pre_dis_initiated_user` string,`recalled_enc_id`
> string,`admission_form_codes` string,`o_and_g_yn` string,`non_emerg_yn`
> string,`other_reason_remarks` string,`revise_visit_remarks`
> string,`cancel_visit_remarks` string,`height_on_admission` string,`bmi`
> string,`discharge_to` string,`ts` bigint,PRIMARY
> KEY(`encounter_id`,`facility_id`) NOT ENFORCED) WITH (
>
>
> 'connector'='hudi','path'='hdfs://redected/user/data/db/hudi_bruneihealth_egncpr_mor//ibaehis_ogg_sync_pr_encounter',
> 'table.type'='MERGE_ON_READ',
>
>
> 'read.streaming.enabled'='false','read.streaming.check-interval'='5','write.tasks'='3','compaction.tasks'='3','hoodie.datasource.write.partitionpath.field'='_hoodie_partition_path');
>
>
> [files in hdfs]
>
> ```
>
>
>
> ```
>
> [NPE info in $FLINK_HOME/log/flink-work-standalonesession-1.log]
>
>
> 021-07-09 16:22:54,634 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> split_reader -> SinkConversionToTuple2 (3/4)
> (6856a63df778a5e327426a3814a26dde) switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,653 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
> streaming_source (1/1) (a99625774f4e2784a278174b0983c6a8) switched from
> CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
> split_reader -> SinkConversionToTuple2 (1/4)
> (0f9a16a495b0e23ea0798fafda0a923e) switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,654 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: SQL
> Client Stream Collect Sink (1/1) (20f3ef3f7613634eda9d3d5c6dbed865)
> switched from CANCELING to CANCELED.
>
> 2021-07-09 16:22:54,655 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
> default: select _hoodie_commit_time from ibaehis_ogg_sync_pr_encounter
> (161dd04f5a690e1bb1e8522df7492560) switched from state FAILING to FAILED.
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
> ~[?:?]
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_74]
>
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_74]
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> Caused by: java.lang.NullPointerException
>
> at
> org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.readToVector(AbstractColumnReader.java:160)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.nextBatch(ParquetColumnarRowSplitReader.java:298)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.ensureBatch(ParquetColumnarRowSplitReader.java:274)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.table.format.cow.ParquetColumnarRowSplitReader.reachedEnd(ParquetColumnarRowSplitReader.java:253)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat$BaseFileOnlyFilteringIterator.reachedEnd(MergeOnReadInputFormat.java:426)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.table.format.mor.MergeOnReadInputFormat.reachedEnd(MergeOnReadInputFormat.java:241)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.hudi.source.StreamReadOperator.processSplits(StreamReadOperator.java:161)
> ~[hudi-flink-bundle_2.11-0.9.0-SNAPSHOT.jar:0.9.0-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:297)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
> ~[flink-streaming-java_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_74]
>
> 2021-07-09 16:22:54,657 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping
> checkpoint coordinator for job 161dd04f5a690e1bb1e8522df7492560.
>
> 2021-07-09 16:22:54,658 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Shutting down
>
> ```
>
>
>
> and i tried to track this exception, it seems like throwing NPE before
> throwing RuntimeException.
>
> [org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader.java]
>
>
>
>
> [org.apache.parquet.hadoop.ColumnChunkPageReader.java]
>
>
> when "compressedPages" is empty, method readPage() will return null. And
> “compressedPages” is set in constructor.
>
>
> “compressedPages” is set in constructor. then i tried to find what
> "List<DataPage> compressedPages” is.
>
>
>
> It seems to be that when pageHeader.type is not DATA_PAGE or DATA_PAGE_V2 ,
> then the “ compressedPages” is empty list when construct
> ColumnChunkPageReader.
>
>
> I cannot go on tracking so far, i don’t known how to read parquet file
> head,how to set parquet file head and i don’t known what is wrong.
>
> Please help to offer some advices. thank you very much!
>
>
> [org.apache.parquet.hadoop.ParquetFileReader.java]
>
> public ColumnChunkPageReader readAllPages() throws IOException {
> List<DataPage> pagesInChunk = new ArrayList<DataPage>();
> DictionaryPage dictionaryPage = null;
> PrimitiveType type = getFileMetaData().getSchema()
> .getType(descriptor.col.getPath()).asPrimitiveType();
> long valuesCountReadSoFar = 0;
> int dataPageCountReadSoFar = 0;
> while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
> PageHeader pageHeader = readPageHeader();
> int uncompressedPageSize = pageHeader.getUncompressed_page_size();
> int compressedPageSize = pageHeader.getCompressed_page_size();
> final BytesInput pageBytes;
> switch (pageHeader.type) {
> case DICTIONARY_PAGE:
> // there is only one dictionary page per column chunk
> if (dictionaryPage != null) {
> throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
> }
> pageBytes = this.readAsBytesInput(compressedPageSize);
> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
> "could not verify dictionary page integrity, CRC checksum verification failed");
> }
> DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
> dictionaryPage =
> new DictionaryPage(
> pageBytes,
> uncompressedPageSize,
> dicHeader.getNum_values(),
> converter.getEncoding(dicHeader.getEncoding())
> );
> // Copy crc to new page, used for testing
> if (pageHeader.isSetCrc()) {
> dictionaryPage.setCrc(pageHeader.getCrc());
> }
> break;
> case DATA_PAGE:
> DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
> pageBytes = this.readAsBytesInput(compressedPageSize);
> if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
> verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
> "could not verify page integrity, CRC checksum verification failed");
> }
> DataPageV1 dataPageV1 = new DataPageV1(
> pageBytes,
> dataHeaderV1.getNum_values(),
> uncompressedPageSize,
> converter.fromParquetStatistics(
> getFileMetaData().getCreatedBy(),
> dataHeaderV1.getStatistics(),
> type),
> converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
> converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
> converter.getEncoding(dataHeaderV1.getEncoding()));
> // Copy crc to new page, used for testing
> if (pageHeader.isSetCrc()) {
> dataPageV1.setCrc(pageHeader.getCrc());
> }
> pagesInChunk.add(dataPageV1);
> valuesCountReadSoFar += dataHeaderV1.getNum_values();
> ++dataPageCountReadSoFar;
> break;
> case DATA_PAGE_V2:
> DataPageHeaderV2 dataHeaderV2 = pageHeader.getData_page_header_v2();
> int dataSize = compressedPageSize - dataHeaderV2.getRepetition_levels_byte_length() - dataHeaderV2.getDefinition_levels_byte_length();
> pagesInChunk.add(
> new DataPageV2(
> dataHeaderV2.getNum_rows(),
> dataHeaderV2.getNum_nulls(),
> dataHeaderV2.getNum_values(),
> this.readAsBytesInput(dataHeaderV2.getRepetition_levels_byte_length()),
> this.readAsBytesInput(dataHeaderV2.getDefinition_levels_byte_length()),
> converter.getEncoding(dataHeaderV2.getEncoding()),
> this.readAsBytesInput(dataSize),
> uncompressedPageSize,
> converter.fromParquetStatistics(
> getFileMetaData().getCreatedBy(),
> dataHeaderV2.getStatistics(),
> type),
> dataHeaderV2.isIs_compressed()
> ));
> valuesCountReadSoFar += dataHeaderV2.getNum_values();
> ++dataPageCountReadSoFar;
> break;
> default:
> LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
> stream.skipFully(compressedPageSize);
> break;
> }
> }
> if (offsetIndex == null && valuesCountReadSoFar != descriptor.metadata.getValueCount()) {
> // Would be nice to have a CorruptParquetFileException or something as a subclass?
> throw new IOException(
> "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
> getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
> " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
> + " pages ending at file offset " + (descriptor.fileOffset + stream.position()));
> }
> BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec());
> return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex,
> blocks.get(currentBlock).getRowCount());
> }
>
>
> Best Regards!
>