You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 09:27:00 UTC

[jira] [Updated] (HUDI-2163) reading hudi mor table in flink sql does not send deletion events to down stream

     [ https://issues.apache.org/jira/browse/HUDI-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Udit Mehrotra updated HUDI-2163:
--------------------------------
    Fix Version/s:     (was: 0.9.0)
                   0.10.0

> reading hudi mor table in flink sql  does not send deletion events to down stream
> ---------------------------------------------------------------------------------
>
>                 Key: HUDI-2163
>                 URL: https://issues.apache.org/jira/browse/HUDI-2163
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Flink Integration
>    Affects Versions: 0.9.0
>         Environment: hudi-flink-bundle-0.9.0-SNAPSHOT
> flink 1.12.0
>            Reporter: jiulong
>            Priority: Major
>             Fix For: 0.10.0
>
>
> first, i create a table in flink with connector=‘hudi’ and table.type=‘MERGE_ON_READ’ and 'read.streaming.enabled' = ‘true’. 
> second, i write records with RowKind(INSERT or DELETE) using flink table streaming sql.
> third, in flink sql-client, the result is all right, insert event produces new record or updates old record, deletion event deletes old record. {color:#fb0106}but i cann’t detect deletion event in flink sql-client changelog mode.{color}
> {color:#172b4d}fourth, i tried to read hudi table using flink sql “select * from xxx” and transform flink Table object to RetractStream(StreamTableEnvironment.toRetractStream(Table, Row.class)), {color:#fb0106}and it cann’t detect deletion events too.{color}{color}
>  
> {color:#172b4d}{color:#fb0106}{color:#000000}In Hudi Source code, {color}{color:#fb0106}i found only in case of “MergeOnReadInputSplit has no basePath property” or “MergeOnReadInputSplit.getMergeType is {color}{color:#fb0106}skip_merge”, Iterator of MergeOnReadInputSplit{color} will be emit deletion event. And none of previous conditions are met. {color}{color}
> do i make something wrong? How do I detect deletion event in flink hudi? Please offer some advices, thanks!
>  
> {color:#000000}```{color}
> {color:#000000}[{color}org.apache.hudi.table.format.mor.MergeOnReadInputFormat.java]
> @Override
> public void open(MergeOnReadInputSplit split) throws IOException {
>  this.currentReadCount = 0L;
>  this.hadoopConf = StreamerUtil.getHadoopConf();
>  if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() > 0)) {
>  if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
>  // base file only with commit time filtering
>  this.iterator = new BaseFileOnlyFilteringIterator(
>  split.getInstantRange(),
>  this.tableState.getRequiredRowType(),
>  getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos)));
>  } else {
>  // base file only
>  this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
>  }
>  } else if (!split.getBasePath().isPresent()) {
>  // log files only
>  this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
>  } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
>  this.iterator = new SkipMergeIterator(
>  getRequiredSchemaReader(split.getBasePath().get()),
>  getLogFileIterator(split));
>  } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
>  this.iterator = new MergeIterator(
>  hadoopConf,
>  split,
>  this.tableState.getRowType(),
>  this.tableState.getRequiredRowType(),
>  new Schema.Parser().parse(this.tableState.getAvroSchema()),
>  new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
>  this.requiredPos,
>  getFullSchemaReader(split.getBasePath().get()));
>  } else {
>  throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for "
>  + "file path: " + split.getBasePath()
>  + "log paths: " + split.getLogPaths()
>  + "hoodie table path: " + split.getTablePath()
>  + "spark partition Index: " + split.getSplitNumber()
>  + "merge type: " + split.getMergeType());
>  }
>  
>  
> private Iterator<RowData> getLogFileIterator(MergeOnReadInputSplit split) {
>  final Schema tableSchema = new Schema.Parser().parse(tableState.getAvroSchema());
>  final Schema requiredSchema = new Schema.Parser().parse(tableState.getRequiredAvroSchema());
>  final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
>  final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
>  AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
>  final Map<String, HoodieRecord<? extends HoodieRecordPayload>> logRecords =
>  FormatUtils.scanLog(split, tableSchema, hadoopConf).getRecords();
>  final Iterator<String> logRecordsKeyIterator = logRecords.keySet().iterator();
>  final int[] pkOffset = tableState.getPkOffsetsInRequired();
>  // flag saying whether the pk semantics has been dropped by user specified
>  // projections. For e.g, if the pk fields are [a, b] but user only select a,
>  // then the pk semantics is lost.
>  final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset -> offset == -1);
>  final LogicalType[] pkTypes = pkSemanticLost ? null : tableState.getPkTypes(pkOffset);
>  final StringToRowDataConverter converter = pkSemanticLost ? null : new StringToRowDataConverter(pkTypes);
>  return new Iterator<RowData>() {
>  private RowData currentRecord;
>  @Override
>  public boolean hasNext() {
>  while (logRecordsKeyIterator.hasNext()) {
>  String curAvroKey = logRecordsKeyIterator.next();
>  Option<IndexedRecord> curAvroRecord = null;
>  final HoodieRecord<?> hoodieRecord = logRecords.get(curAvroKey);
>  try {
>  curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
>  } catch (IOException e) {
>  throw new HoodieException("Get avro insert value error for key: " + curAvroKey, e);
>  }
>  if (!curAvroRecord.isPresent()) {
>  // delete record found
>  if (emitDelete && !pkSemanticLost) {
>  GenericRowData delete = new GenericRowData(tableState.getRequiredRowType().getFieldCount());
>  final String recordKey = hoodieRecord.getRecordKey();
>  final String[] pkFields = KeyGenUtils.extractRecordKeys(recordKey);
>  final Object[] converted = converter.convert(pkFields);
>  for (int i = 0; i < pkOffset.length; i++) {
>  delete.setField(pkOffset[i], converted[i]);
>  }
>  delete.setRowKind(RowKind.DELETE);
>  this.currentRecord = delete;
>  return true;
>  }
>  // skipping if the condition is unsatisfied
>  // continue;
>  } else {
>  GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
>  curAvroRecord.get(),
>  requiredSchema,
>  requiredPos,
>  recordBuilder);
>  currentRecord = (RowData) avroToRowDataConverter.convert(requiredAvroRecord);
>  return true;
>  }
>  }
>  return false;
>  }
>  @Override
>  public RowData next() {
>  return currentRecord;
>  }
>  };
> }
> {color:#000000}```{color}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)