You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 09:27:00 UTC
[jira] [Updated] (HUDI-2163) reading hudi mor table in flink sql
does not send deletion events to down stream
[ https://issues.apache.org/jira/browse/HUDI-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Udit Mehrotra updated HUDI-2163:
--------------------------------
Fix Version/s: (was: 0.9.0)
0.10.0
> reading hudi mor table in flink sql does not send deletion events to down stream
> ---------------------------------------------------------------------------------
>
> Key: HUDI-2163
> URL: https://issues.apache.org/jira/browse/HUDI-2163
> Project: Apache Hudi
> Issue Type: Bug
> Components: Flink Integration
> Affects Versions: 0.9.0
> Environment: hudi-flink-bundle-0.9.0-SNAPSHOT
> flink 1.12.0
> Reporter: jiulong
> Priority: Major
> Fix For: 0.10.0
>
>
> first, i create a table in flink with connector=‘hudi’ and table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’.
> second, i write records with RowKind(INSERT or DELETE) using flink table streaming sql.
> third, in flink sql-client, the result is all right, insert event produces new record or updates old record, deletion event deletes old record. {color:#fb0106}but i cann’t detect deletion event in flink sql-client changelog mode.{color}
> {color:#172b4d}fourth, i tried to read hudi table using flink sql “select * from xxx” and transform flink Table object to RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), {color:#fb0106}and it cann’t detect deletion events too.{color}{color}
>
> {color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code, {color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no basePath property” or “MergeOnReadInputSplit.getMergeType is {color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color} will be emit deletion event. And none of previous conditions are met. {color}{color}
> do i make something wrong? How do I detect deletion event in flink hudi? Please offer some advices, thanks!
>
> {color:#000000}```{color}
> {color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
> @Override
> public void open(MergeOnReadInputSplit split) throws IOException {
> this.currentReadCount = 0L;
> this.hadoopConf = StreamerUtil.getHadoopConf();
> if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
> if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
> // base file only with commit time filtering
> this.iterator = new BaseFileOnlyFilteringIterator(
> split.getInstantRange(),
> this.tableState.getRequiredRowType(),
> getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
> } else {
> // base file only
> this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
> }
> } else if (!split.getBasePath().isPresent()) {
> // log files only
> this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
> } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
> this.iterator = new SkipMergeIterator(
> getRequiredSchemaReader(split.getBasePath().get()),
> getLogFileIterator(split));
> } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
> this.iterator = new MergeIterator(
> hadoopConf,
> split,
> this.tableState.getRowType(),
> this.tableState.getRequiredRowType(),
> new Schema.Parser().parse(this.tableState.getAvroSchema()),
> new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
> this.requiredPos,
> getFullSchemaReader(split.getBasePath().get()));
> } else {
> throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
> + "file path: " + split.getBasePath()
> + "log paths: " + split.getLogPaths()
> + "hoodie table path: " + split.getTablePath()
> + "spark partition Index: " + split.getSplitNumber()
> + "merge type: " + split.getMergeType());
> }
>
>
> private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
> final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
> final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
> final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
> final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
> AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
> final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
> FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
> final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
> final int[] pkOffset = tableState.getPkOffsetsInRequired();
> // flag saying whether the pk semantics has been dropped by user specified
> // projections. For e.g, if the pk fields are [a, b] but user only select a,
> // then the pk semantics is lost.
> final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
> final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
> final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
> return new Iterator<RowData>() {
> private RowData currentRecord;
> @Override
> public boolean hasNext() {
> while (logRecordsKeyIterator.hasNext()) {
> String curAvroKey = logRecordsKeyIterator.next();
> Option<IndexedRecord> curAvroRecord = null;
> final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
> try {
> curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
> } catch (IOException e) {
> throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
> }
> if (!curAvroRecord.isPresent()) {
> // delete record found
> if (emitDelete && !pkSemanticLost) {
> GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
> final String recordKey = hoodieRecord.getRecordKey();
> final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
> final Object[] converted = converter.convert(pkFields);
> for (int i = 0; i < pkOffset.length; i++) {
> delete.setField(pkOffset[i], converted[i]);
> }
> delete.setRowKind(RowKind.DELETE);
> this.currentRecord = delete;
> return true;
> }
> // skipping if the condition is unsatisfied
> // continue;
> } else {
> GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
> curAvroRecord.get(),
> requiredSchema,
> requiredPos,
> recordBuilder);
> currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
> return true;
> }
> }
> return false;
> }
> @Override
> public RowData next() {
> return currentRecord;
> }
> };
> }
> {color:#000000}```{color}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)