You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hudi.apache.org by vtygoss <vt...@126.com> on 2021/07/10 14:09:44 UTC
hudi mor table in flilnk sql cannot emit deletion event.
Hi,
May I request a code example about “flink hudi source reads mor table and sends deletion events” in hudi-flink-bundle-0.9.0-SNAPSHOT ?
first, i create a table in flink with connector=‘hudi’ and table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’.
second, i write records with RowKind(INSERT or DELETE) using flink table streaming sql.
third, in flink sql-client, the result is all right, insert event produces new record or updates old record, deletion event delete old record. but i cann’t detect deletion event in flink sql-client.
fourth, i tried to read hudi table using flink sql “select * from xxx” and transform flink Table object to RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), and i cann’t detect deletion events too.
do i make something wrong?
In Hudi Source code, i found only in case of “MergeOnReadInputSplit has no basePath property” or “MergeOnReadInputSplit.getMergeType==skip_merge”, Iterator of MergeOnReadInputSplit will be emit deletion event. And none of previous conditions are met.
How do I detect deletion event in flink hudi? Please offer some advices, thank you very much!
Best Regards!
```
[org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
@Override
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
// base file only with commit time filtering
this.iterator = new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
this.tableState.getRequiredRowType(),
getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
} else {
// base file only
this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
}
} else if (!split.getBasePath().isPresent()) {
// log files only
this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
this.iterator = new SkipMergeIterator(
getRequiredSchemaReader(split.getBasePath().get()),
getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
this.iterator = new MergeIterator(
hadoopConf,
split,
this.tableState.getRowType(),
this.tableState.getRequiredRowType(),
new Schema.Parser().parse(this.tableState.getAvroSchema()),
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
getFullSchemaReader(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
+ "log paths: " + split.getLogPaths()
+ "hoodie table path: " + split.getTablePath()
+ "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
}
private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
// projections. For e.g, if the pk fields are [a, b] but user only select a,
// then the pk semantics is lost.
final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
return new Iterator<RowData>() {
private RowData currentRecord;
@Override
public boolean hasNext() {
while (logRecordsKeyIterator.hasNext()) {
String curAvroKey = logRecordsKeyIterator.next();
Option<IndexedRecord> curAvroRecord = null;
final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
try {
curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
} catch (IOException e) {
throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
}
if (!curAvroRecord.isPresent()) {
// delete record found
if (emitDelete && !pkSemanticLost) {
GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
final String recordKey = hoodieRecord.getRecordKey();
final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
final Object[] converted = converter.convert(pkFields);
for (int i = 0; i < pkOffset.length; i++) {
delete.setField(pkOffset[i], converted[i]);
}
delete.setRowKind(RowKind.DELETE);
this.currentRecord = delete;
return true;
}
// skipping if the condition is unsatisfied
// continue;
} else {
GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
curAvroRecord.get(),
requiredSchema,
requiredPos,
recordBuilder);
currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
return true;
}
}
return false;
}
@Override
public RowData next() {
return currentRecord;
}
};
}
```