You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2021/07/10 14:09:44 UTC

hudi mor table in flilnk sql cannot emit deletion event.

Hi, 


May I  request a code example about “flink hudi source reads mor table and sends deletion events” in hudi-flink-bundle-0.9.0-SNAPSHOT ?

first, i create a table in flink with connector=‘hudi’ and table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’. 


second, i write records with RowKind(INSERT or DELETE) using flink table streaming sql. 


third, in flink sql-client, the result is all right, insert event produces new record or updates old record, deletion event delete old record. but i cann’t detect deletion event in flink sql-client.


fourth, i tried to read hudi table using flink sql “select * from xxx” and transform flink Table object to RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), and i cann’t detect deletion events too.


do i make something wrong? 


In Hudi Source code, i found only in case of “MergeOnReadInputSplit has no basePath property” or “MergeOnReadInputSplit.getMergeType==skip_merge”, Iterator of MergeOnReadInputSplit will be emit deletion event. And none of previous conditions are met. 


How do I detect deletion event in flink hudi? Please offer some advices, thank you very much!


Best Regards! 








```
[org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
@Override
public void open(MergeOnReadInputSplit split) throws IOException {
 this.currentReadCount = 0L;
 this.hadoopConf = StreamerUtil.getHadoopConf();
 if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
 if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
 // base file only with commit time filtering
 this.iterator = new BaseFileOnlyFilteringIterator(
 split.getInstantRange(),
 this.tableState.getRequiredRowType(),
 getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
 } else {
 // base file only
 this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
 }
 } else if (!split.getBasePath().isPresent()) {
 // log files only
 this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
 } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
 this.iterator = new SkipMergeIterator(
 getRequiredSchemaReader(split.getBasePath().get()),
 getLogFileIterator(split));
 } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
 this.iterator = new MergeIterator(
 hadoopConf,
 split,
 this.tableState.getRowType(),
 this.tableState.getRequiredRowType(),
 new Schema.Parser().parse(this.tableState.getAvroSchema()),
 new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
 this.requiredPos,
 getFullSchemaReader(split.getBasePath().get()));
 } else {
 throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
 + "file path: " + split.getBasePath()
 + "log paths: " + split.getLogPaths()
 + "hoodie table path: " + split.getTablePath()
 + "spark partition Index: " + split.getSplitNumber()
 + "merge type: " + split.getMergeType());
 }
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
 final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
 final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
 final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
 final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
 AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
 final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
 FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
 final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
 final int[] pkOffset = tableState.getPkOffsetsInRequired();
 // flag saying whether the pk semantics has been dropped by user specified
 // projections. For e.g, if the pk fields are [a, b] but user only select a,
 // then the pk semantics is lost.
 final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
 final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
 final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);

 return new Iterator<RowData>() {
 private RowData currentRecord;

 @Override
 public boolean hasNext() {
 while (logRecordsKeyIterator.hasNext()) {
 String curAvroKey = logRecordsKeyIterator.next();
 Option<IndexedRecord> curAvroRecord = null;
 final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
 try {
 curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
 } catch (IOException e) {
 throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
 }
 if (!curAvroRecord.isPresent()) {
 // delete record found
 if (emitDelete && !pkSemanticLost) {
 GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());

 final String recordKey = hoodieRecord.getRecordKey();
 final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
 final Object[] converted = converter.convert(pkFields);
 for (int i = 0; i < pkOffset.length; i++) {
 delete.setField(pkOffset[i], converted[i]);
 }
 delete.setRowKind(RowKind.DELETE);

 this.currentRecord = delete;
 return true;
 }
 // skipping if the condition is unsatisfied
 // continue;
 } else {
 GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
 curAvroRecord.get(),
 requiredSchema,
 requiredPos,
 recordBuilder);
 currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
 return true;
 }
 }
 return false;
 }

 @Override
 public RowData next() {
 return currentRecord;
 }
 };
}
```