You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/16 03:50:45 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #5522: [HUDI-3378][HUDI-3379][HUDI-3381] Rebasing usages of HoodieRecordPayload and raw Avro payload to rely on HoodieRecord instead

xushiyan commented on code in PR #5522:
URL: https://github.com/apache/hudi/pull/5522#discussion_r873267889


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,8 +69,8 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
 
   @Override
   public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {

Review Comment:
   /nit renaming to avoid confusion
   
   ```suggestion
     public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> genResult) {
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -201,33 +202,24 @@ protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
     return hoodieRecord.getCurrentLocation() != null;
   }
 
-  private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
-    Option<Map<String, String>> recordMetadata = hoodieRecord.getData().getMetadata();
+  private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
+    Option<Map<String, String>> recordMetadata = hoodieRecord.getMetadata();
     try {
       // Pass the isUpdateRecord to the props for HoodieRecordPayload to judge
       // Whether it is an update or insert record.
       boolean isUpdateRecord = isUpdateRecord(hoodieRecord);
       // If the format can not record the operation field, nullify the DELETE payload manually.
       boolean nullifyPayload = HoodieOperation.isDelete(hoodieRecord.getOperation()) && !config.allowOperationMetadataField();
       recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR, String.valueOf(isUpdateRecord));
-      Option<IndexedRecord> avroRecord = nullifyPayload ? Option.empty() : hoodieRecord.getData().getInsertValue(tableSchema, recordProperties);
-      if (avroRecord.isPresent()) {
-        if (avroRecord.get().equals(IGNORE_RECORD)) {
-          return avroRecord;
+      Option<HoodieRecord> finalRecord = Option.empty();

Review Comment:
   no need to init `finalRecord` here. it's only used in L222



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -257,46 +255,45 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+  private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
     boolean isDelete = false;
-    if (indexedRecord.isPresent()) {
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    if (combineRecordOp.isPresent()) {
       updatedRecordsWritten++;
-      GenericRecord record = (GenericRecord) indexedRecord.get();
-      if (oldRecord != record) {
+      if (!oldRecord.equals(combineRecordOp.get())) {
         // the incoming record is chosen
         isDelete = HoodieOperation.isDelete(hoodieRecord.getOperation());
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    return writeRecord(hoodieRecord, combineRecordOp, schema, config.getProps(), isDelete);
   }
 
   protected void writeInsertRecord(HoodieRecord<T> hoodieRecord) throws IOException {
     Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
-    Option<IndexedRecord> insertRecord = hoodieRecord.getData().getInsertValue(schema, config.getProps());
     // just skip the ignored record
-    if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
+    if (hoodieRecord.shouldIgnore(schema, config.getProps())) {
       return;
     }
-    if (writeRecord(hoodieRecord, insertRecord, HoodieOperation.isDelete(hoodieRecord.getOperation()))) {
+    if (writeRecord(hoodieRecord, Option.of(hoodieRecord), schema, config.getProps(), HoodieOperation.isDelete(hoodieRecord.getOperation()))) {

Review Comment:
   maybe `isDelete()` can be an API too?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -502,12 +521,16 @@ private void writeToBuffer(HoodieRecord<T> record) {
       record.seal();
     }
     // fetch the ordering val first in case the record was deflated.
-    final Comparable<?> orderingVal = record.getData().getOrderingValue();
-    Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
+    final Comparable<?> orderingVal = ((HoodieRecordPayload)record.getData()).getOrderingValue();

Review Comment:
   should we have an API like `getOrderingValue()` ?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java:
##########
@@ -47,4 +69,170 @@ public T getData() {
     }
     return data;
   }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return getRecordKey();
+  }
+
+  // TODO remove
+  public Option<GenericRecord> asAvro(Schema schema) throws IOException {
+    return getData().getInsertValue(schema);
+  }

Review Comment:
   are you going to remove it in this PR?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -376,13 +377,20 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) {
    * handle it.
    */
   private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
-    try (ClosableIterator<IndexedRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
+    HoodieRecord.Mapper mapper = (rec) -> createHoodieRecord(rec, this.hoodieTableMetaClient.getTableConfig(),
+        this.payloadClassFQN, this.preCombineField, this.withOperationField, this.simpleKeyGenFields, this.partitionName);
+
+    try (ClosableIterator<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt, mapper)) {
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
+      Schema finalReadSchema = dataBlock.getSchema();

Review Comment:
   this can go to the else block since `finalReadSchema` will be overwritten



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -359,8 +369,17 @@ private void processAppendResult(AppendResult result, List<IndexedRecord> record
             .collect(Collectors.toList());
       }
 
+      List<IndexedRecord> indexedRecords = new LinkedList<>();
+      for (HoodieRecord hoodieRecord : recordList) {
+        if (hoodieRecord instanceof HoodieAvroIndexRecord) {
+          indexedRecords.add((IndexedRecord) hoodieRecord.getData());
+        } else {
+          indexedRecords.add((IndexedRecord) ((HoodieRecordPayload) hoodieRecord.getData()).getInsertValue(tableSchema, config.getProps()).get());

Review Comment:
   should this become an API itself? like `toIndexedRecord()` ?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -257,46 +255,45 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, Option<IndexedRecord> indexedRecord) {
+  private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
     boolean isDelete = false;
-    if (indexedRecord.isPresent()) {
+    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    if (combineRecordOp.isPresent()) {
       updatedRecordsWritten++;
-      GenericRecord record = (GenericRecord) indexedRecord.get();
-      if (oldRecord != record) {
+      if (!oldRecord.equals(combineRecordOp.get())) {

Review Comment:
   can `oldRecord` be null? better use null-safe comparison like `Objects.equals`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -180,18 +176,14 @@ public void write() {
     } else {
       keyIterator = recordMap.keySet().stream().iterator();
     }
-    try {
-      while (keyIterator.hasNext()) {
-        final String key = keyIterator.next();
-        HoodieRecord<T> record = recordMap.get(key);
-        if (useWriterSchema) {
-          write(record, record.getData().getInsertValue(tableSchemaWithMetaFields, config.getProps()));
-        } else {
-          write(record, record.getData().getInsertValue(tableSchema, config.getProps()));
-        }
+    while (keyIterator.hasNext()) {
+      final String key = keyIterator.next();
+      HoodieRecord<T> record = recordMap.get(key);
+      if (useWriterSchema) {
+        write(record, tableSchemaWithMetaFields, config.getProps());
+      } else {
+        write(record, tableSchema, config.getProps());

Review Comment:
   /nit can do 1-liner here



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexRecord.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieAvroIndexRecord extends HoodieRecord<IndexedRecord> {

Review Comment:
   to make it clear that it's of type `IndexedRecord`
   
   ```suggestion
   public class HoodieAvroIndexedRecord extends HoodieRecord<IndexedRecord> {
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -243,14 +235,32 @@ private Option<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
       // part of marking
       // record successful.
       hoodieRecord.deflate();
-      return avroRecord;
+      return finalRecord;
     } catch (Exception e) {
       LOG.error("Error writing record  " + hoodieRecord, e);
       writeStatus.markFailure(hoodieRecord, e, recordMetadata);
     }
     return Option.empty();

Review Comment:
   this is a problem about the existing logic: looks like we return empty option even when got an exception. But later we have the logic to interpret empty option as a delete, which can lead to un-intended delete?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/MappingIterator.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import java.util.Iterator;
+import java.util.function.Function;
+
+/**
+ * TODO
+ */
+public class MappingIterator<T, R> implements ClosableIterator<R> {
+
+  private final Iterator<T> sourceIterator;
+  private final Function<T, R> mapper;
+
+  public MappingIterator(Iterator<T> sourceIterator, Function<T, R> mapper) {
+    this.sourceIterator = sourceIterator;
+    this.mapper = mapper;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return sourceIterator.hasNext();
+  }
+
+  @Override
+  public R next() {
+    return mapper.apply(sourceIterator.next());
+  }
+
+  @Override
+  public void close() {
+  }

Review Comment:
   not calling super.close() ?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexRecord.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieAvroIndexRecord extends HoodieRecord<IndexedRecord> {
+
+  public HoodieAvroIndexRecord(IndexedRecord data) {
+    super(null, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data) {
+    super(key, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) {
+    super(key, data, operation);
+  }
+
+  public HoodieAvroIndexRecord(HoodieRecord<IndexedRecord> record) {
+    super(record);
+  }
+
+  public HoodieAvroIndexRecord() {
+  }
+
+  public Option<GenericRecord> asAvro() {
+    return Option.of((GenericRecord) data);
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+    return null;
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey((GenericRecord) data) : ((GenericRecord) data).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+  }
+
+  @Override
+  public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
+    return null;
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException {
+    GenericRecord avroPayloadInNewSchema =
+        HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema);
+    return new HoodieAvroIndexRecord(avroPayloadInNewSchema);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(data, writeSchemaWithMetaFields, new HashMap<>())
+        : HoodieAvroUtils.rewriteRecord((GenericRecord) data, writeSchemaWithMetaFields);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName)
+        : HoodieAvroUtils.rewriteRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(data, newSchema, renameCols);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) getData();
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
+    return mapper.apply(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) data;
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecord(oldRecord, newSchema);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        ((GenericRecord) data).put(metadataField.getFieldName(), metadataValues.get(metadataField));

Review Comment:
   ```suggestion
           ((GenericRecord) data).put(metadataField.getFieldName(), value);
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexRecord.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieAvroIndexRecord extends HoodieRecord<IndexedRecord> {
+
+  public HoodieAvroIndexRecord(IndexedRecord data) {
+    super(null, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data) {
+    super(key, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) {
+    super(key, data, operation);
+  }
+
+  public HoodieAvroIndexRecord(HoodieRecord<IndexedRecord> record) {
+    super(record);
+  }
+
+  public HoodieAvroIndexRecord() {
+  }
+
+  public Option<GenericRecord> asAvro() {
+    return Option.of((GenericRecord) data);
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+    return null;
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey((GenericRecord) data) : ((GenericRecord) data).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+  }
+
+  @Override
+  public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
+    return null;
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException {
+    GenericRecord avroPayloadInNewSchema =
+        HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema);
+    return new HoodieAvroIndexRecord(avroPayloadInNewSchema);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(data, writeSchemaWithMetaFields, new HashMap<>())
+        : HoodieAvroUtils.rewriteRecord((GenericRecord) data, writeSchemaWithMetaFields);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName)
+        : HoodieAvroUtils.rewriteRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(data, newSchema, renameCols);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) getData();
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
+    return mapper.apply(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) data;
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecord(oldRecord, newSchema);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        ((GenericRecord) data).put(metadataField.getFieldName(), metadataValues.get(metadataField));
+      }
+    });
+
+    return new HoodieAvroIndexRecord(data);
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.put(pos, newValue);
+    return this;
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+    if (getData().equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    return null;

Review Comment:
   shouldn't this return `Option.empty()` ? and is there a case we should return some metadata instead of always empty?



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java:
##########
@@ -52,11 +53,23 @@ public HoodieParquetStreamWriter(FSDataOutputStream outputStream,
         .build();
   }
 
-  public void writeAvro(String key, R object) throws IOException {
-    writer.write(object);
+  @Override
+  public boolean canWrite() {
+    return true;
+  }
+
+  @Override
+  public void writeAvro(String key, IndexedRecord record) throws IOException {
+    writer.write(record);
     writeSupport.add(key);
   }
 
+  @Override
+  public void writeAvroWithMetadata(HoodieKey key, IndexedRecord avroRecord) throws IOException {
+    // TODO support populating the metadata

Review Comment:
   doing it in this PR?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexRecord.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * This only use by reader returning.
+ */
+public class HoodieAvroIndexRecord extends HoodieRecord<IndexedRecord> {
+
+  public HoodieAvroIndexRecord(IndexedRecord data) {
+    super(null, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data) {
+    super(key, data);
+  }
+
+  public HoodieAvroIndexRecord(HoodieKey key, IndexedRecord data, HoodieOperation operation) {
+    super(key, data, operation);
+  }
+
+  public HoodieAvroIndexRecord(HoodieRecord<IndexedRecord> record) {
+    super(record);
+  }
+
+  public HoodieAvroIndexRecord() {
+  }
+
+  public Option<GenericRecord> asAvro() {
+    return Option.of((GenericRecord) data);
+  }
+
+  @Override
+  public HoodieRecord newInstance() {
+    return null;
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    return keyGeneratorOpt.isPresent() ? keyGeneratorOpt.get().getRecordKey((GenericRecord) data) : ((GenericRecord) data).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+  }
+
+  @Override
+  public HoodieRecord preCombine(HoodieRecord<IndexedRecord> previousRecord) {
+    return null;
+  }
+
+  @Override
+  public Option<HoodieRecord> combineAndGetUpdateValue(HoodieRecord previousRecord, Schema schema, Properties props) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord mergeWith(HoodieRecord other, Schema readerSchema, Schema writerSchema) throws IOException {
+    return null;
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema, TypedProperties props) throws IOException {
+    GenericRecord avroPayloadInNewSchema =
+        HoodieAvroUtils.rewriteRecord((GenericRecord) data, targetSchema);
+    return new HoodieAvroIndexRecord(avroPayloadInNewSchema);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteRecordWithNewSchema(data, writeSchemaWithMetaFields, new HashMap<>())
+        : HoodieAvroUtils.rewriteRecord((GenericRecord) data, writeSchemaWithMetaFields);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    GenericRecord rewriteRecord = schemaOnReadEnabled ? HoodieAvroUtils.rewriteEvolutionRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName)
+        : HoodieAvroUtils.rewriteRecordWithMetadata((GenericRecord) data, writeSchemaWithMetaFields, fileName);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(data, newSchema, renameCols);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols, Mapper mapper) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) getData();
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
+    return mapper.apply(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    GenericRecord oldRecord = (GenericRecord) data;
+    GenericRecord rewriteRecord = HoodieAvroUtils.rewriteRecord(oldRecord, newSchema);
+    return new HoodieAvroIndexRecord(rewriteRecord);
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        ((GenericRecord) data).put(metadataField.getFieldName(), metadataValues.get(metadataField));
+      }
+    });
+
+    return new HoodieAvroIndexRecord(data);
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.put(pos, newValue);
+    return this;
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+    if (getData().equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }

Review Comment:
   can be 1-liner



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java:
##########
@@ -179,40 +181,45 @@ public final ClosableIterator<IndexedRecord> getRecordIterator(List<String> keys
     return FilteringIterator.getInstance(allRecords, keySet, fullKey, this::getRecordKey);
   }
 
-  protected ClosableIterator<IndexedRecord> readRecordsFromBlockPayload() throws IOException {
+  protected ClosableIterator<HoodieRecord> readRecordsFromBlockPayload(HoodieRecord.Mapper mapper) throws IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
       inflate();
     }
 
     try {
-      return deserializeRecords(getContent().get());
+      return deserializeRecords(getContent().get(), mapper);
     } finally {
       // Free up content to be GC'd by deflating the block
       deflate();
     }
   }
 
-  protected ClosableIterator<IndexedRecord> lookupRecords(List<String> keys, boolean fullKey) throws IOException {
+  protected ClosableIterator<HoodieRecord> lookupRecords(List<String> keys, boolean fullKey, HoodieRecord.Mapper mapper) throws IOException {
     throw new UnsupportedOperationException(
         String.format("Point lookups are not supported by this Data block type (%s)", getBlockType())
     );
   }
 
-  protected abstract byte[] serializeRecords(List<IndexedRecord> records) throws IOException;
+  protected abstract byte[] serializeRecords(List<HoodieRecord> records) throws IOException;
 
-  protected abstract ClosableIterator<IndexedRecord> deserializeRecords(byte[] content) throws IOException;
+  protected abstract ClosableIterator<HoodieRecord> deserializeRecords(byte[] content, HoodieRecord.Mapper mapper) throws IOException;
 
   public abstract HoodieLogBlockType getBlockType();
 
   protected Option<Schema.Field> getKeyField(Schema schema) {
     return Option.ofNullable(schema.getField(keyFieldName));
   }
 
-  protected Option<String> getRecordKey(IndexedRecord record) {
-    return getKeyField(record.getSchema())
-        .map(keyField -> record.get(keyField.pos()))
-        .map(Object::toString);
+  protected Option<String> getRecordKey(HoodieRecord record) {
+    if (record.getKey() == null) {
+      IndexedRecord data = ((HoodieAvroIndexRecord) record).getData();
+      return getKeyField(data.getSchema())
+          .map(keyField -> data.get(keyField.pos()))
+          .map(Object::toString);
+    } else {
+      return Option.ofNullable(record.getRecordKey());

Review Comment:
   looks like another corner case handling fix here? maybe worth moving this kind of fix to another PR to highlight it for tracking and cherrypicking purpose



##########
hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java:
##########
@@ -130,8 +130,11 @@ public static <R> R convertToHoodieRecordPayload(GenericRecord record, String pa
                                                    boolean withOperationField,
                                                    Option<String> partitionName) {
     final String recKey = record.get(recordKeyPartitionPathFieldPair.getKey()).toString();
-    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
-        record.get(recordKeyPartitionPathFieldPair.getRight()).toString());
+    String partitionPath = "";
+    if (recordKeyPartitionPathFieldPair.getRight() != null) {
+      partitionPath = (partitionName.isPresent() ? partitionName.get() :
+          record.get(recordKeyPartitionPathFieldPair.getRight()).toString());

Review Comment:
   is this a separate bug you're fixing here? maybe a separate PR for this so it can be easily cherry-picked for the upcoming minor release



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org