You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/13 20:06:01 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #5629: [HUDI-3384][HUDI-3385] Spark specific file reader/writer.

alexeykudinkin commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r920369408


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {

Review Comment:
   We should make `HoodieRecord` a non-generic interface just providing the APIs and move base implementation into `HoodieBaseRecord<T>` so that all the code relying on `HoodieRecord` is not exposed to internal implementation details



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {

Review Comment:
   Let's keep this method where it's today outside of the Record class (w/in utils), and instead let's provide an API
   ```
   getFieldValue(fieldName) 
   ```



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.commmon.model.HoodieSparkRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.RowKeyGeneratorHelper;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+import scala.Tuple2;
+
+public class HoodieSparkRecordUtils {
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(InternalRow data, StructType structType) {
+    return new HoodieSparkRecord(data, structType);
+  }
+
+  /**
+   * Utility method to convert InternalRow to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, boolean withOperationField) {
+    return convertToHoodieSparkRecord(structType, data, preCombineField,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, Option.empty());
+  }
+
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, boolean withOperationField,
+      Option<String> partitionName) {
+    return convertToHoodieSparkRecord(structType, data, preCombineField,
+        Pair.of(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+        withOperationField, partitionName);
+  }
+
+  /**
+   * Utility method to convert bytes to HoodieRecord using schema and payload class.
+   */
+  public static HoodieRecord<InternalRow> convertToHoodieSparkRecord(StructType structType, InternalRow data, String preCombineField, Pair<String, String> recordKeyPartitionPathFieldPair,
+      boolean withOperationField, Option<String> partitionName) {
+    final String recKey = getValue(structType, recordKeyPartitionPathFieldPair.getKey(), data).toString();
+    final String partitionPath = (partitionName.isPresent() ? partitionName.get() :
+        getValue(structType, recordKeyPartitionPathFieldPair.getRight(), data).toString());
+
+    Object preCombineVal = getPreCombineVal(structType, data, preCombineField);

Review Comment:
   We should not be unpacking the row here -- it should only happen when we actually perform the merge



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -16,43 +16,47 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.hudi
+package org.apache.hudi
 
 import java.nio.charset.StandardCharsets
-import java.util
+import java.util.List
 import java.util.concurrent.ConcurrentHashMap
 import org.apache.avro.Schema
-import org.apache.hudi.AvroConversionUtils
-import org.apache.hudi.avro.HoodieAvroUtils
-import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, fromJavaDate, toJavaDate}
+import org.apache.avro.generic.IndexedRecord
+import org.apache.hudi.HoodieSparkUtils.sparkAdapter
+import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
-import org.apache.hudi.common.util.ValidationUtils
+import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.exception.HoodieException
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection}
+import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, MutableProjection, Projection, UnsafeProjection}
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
-import org.apache.spark.sql.hudi.ColumnStatsExpressionUtils.AllowedTransformationExpression.exprUtils.generateMutableProjection
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.types._
-
 import scala.collection.mutable
 
-/**
- * Helper class to do common stuff across Spark InternalRow.
- * Provides common methods similar to {@link HoodieAvroUtils}.
- */
 object HoodieInternalRowUtils {
 
+  val unsafeProjectionMap = new ConcurrentHashMap[(StructType, StructType), UnsafeProjection]
   val projectionMap = new ConcurrentHashMap[(StructType, StructType), MutableProjection]
   val schemaMap = new ConcurrentHashMap[Schema, StructType]
-  val SchemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]]
+  val schemaPosMap = new ConcurrentHashMap[StructType, Map[String, (StructField, Int)]]
+  val rowConverterMap = new ConcurrentHashMap[Schema, HoodieAvroDeserializer]
+  val avroConverterMap = new ConcurrentHashMap[Schema, HoodieAvroSerializer]
 
+  /**
+   * @see org.apache.hudi.avro.HoodieAvroUtils#stitchRecords(org.apache.avro.generic.GenericRecord, org.apache.avro.generic.GenericRecord, org.apache.avro.Schema)
+   */
   def stitchRecords(left: InternalRow, leftSchema: StructType, right: InternalRow, rightSchema: StructType, stitchedSchema: StructType): InternalRow = {
     val mergeSchema = StructType(leftSchema.fields ++ rightSchema.fields)
     val row = new JoinedRow(left, right)
     val projection = getCachedProjection(mergeSchema, stitchedSchema)

Review Comment:
   There's no need to do projection in that case



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -155,13 +165,52 @@ public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<
   }
 
   @Override
-  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties props, int pos, String newValue) throws IOException {
     data.put(pos, newValue);
     return this;
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+  public HoodieRecord expansion(
+      Schema schema,
+      Properties props,
+      String payloadClass,
+      String preCombineField,
+      Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation,
+      Option<String> partitionNameOp,
+      Option<Boolean> populateMetaFieldsOp) {
+    return HoodieAvroUtils.createHoodieRecordFromAvro(data, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFieldsOp);
+  }
+
+  @Override
+  public HoodieRecord transform(Schema schema, Properties props, boolean useKeyGen) {
+    GenericRecord record = (GenericRecord) data;
+    String key;
+    String partition;
+    if (useKeyGen && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
+      try {
+        Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory");
+        Method createKeyGenerator = clazz.getMethod("createKeyGenerator", TypedProperties.class);
+        BaseKeyGenerator keyGeneratorOpt = (BaseKeyGenerator) createKeyGenerator.invoke(null, new TypedProperties(props));
+        key = keyGeneratorOpt.getRecordKey(record);
+        partition = keyGeneratorOpt.getPartitionPath(record);
+      } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+        throw new HoodieException("Only BaseKeyGenerators are supported when meta columns are disabled ", e);
+      }
+    } else {
+      key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      partition = record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString();
+    }
+    HoodieKey hoodieKey = new HoodieKey(key, partition);
+
+    HoodieRecordPayload avroPayload = new RewriteAvroPayload(record);

Review Comment:
   We should not be using `RecordPayload` abstractions anymore -- whole idea of RFC-46 is to get rid of it.
   
   1. HoodieAvroRecord should hold a reference on the Avro payload directly
   2. RecordPayload abstraction should only be used w/in backward-compatible `HoodieMerge` implementation (during transitionary period) to merge the records and nowhere else 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, structType, writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, structType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, CatalystTypeConverters.convertToCatalyst(newValue));
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        data.update(recordSchema.getField(metadataField.getFieldName()).pos(), CatalystTypeConverters.convertToCatalyst(value));
+      }
+    });
+    return this;
+  }
+
+  @Override
+  public HoodieRecord expansion(Schema schema, Properties prop, String payloadClass,

Review Comment:
   What's this method needed for? It seems like it's trying to do too many things 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerge.java:
##########
@@ -31,6 +34,8 @@
 public class HoodieAvroRecordMerge implements HoodieMerge {
   @Override
   public HoodieRecord preCombine(HoodieRecord older, HoodieRecord newer) {
+    ValidationUtils.checkArgument(older.getRecordType() == HoodieRecordType.AVRO);
+    ValidationUtils.checkArgument(newer.getRecordType() == HoodieRecordType.AVRO);
     HoodieRecordPayload picked = unsafeCast(((HoodieAvroRecord) newer).getData().preCombine(((HoodieAvroRecord) older).getData()));

Review Comment:
   Building on my previous comment regarding use of RecordPayload: we should only resort to using it within this context when we actually do the merging, and we should get rid of its usages anywhere else.
   
   Here we should just lookup configured a) payload-class b) pre-combine field and use it to perform the merging



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, structType, writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, structType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, CatalystTypeConverters.convertToCatalyst(newValue));
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        data.update(recordSchema.getField(metadataField.getFieldName()).pos(), CatalystTypeConverters.convertToCatalyst(value));
+      }
+    });
+    return this;
+  }
+
+  @Override
+  public HoodieRecord expansion(Schema schema, Properties prop, String payloadClass,
+      String preCombineField,
+      Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation,
+      Option<String> partitionNameOp,
+      Option<Boolean> populateMetaFieldsOp) {
+    boolean populateMetaFields = populateMetaFieldsOp.orElse(false);
+    if (populateMetaFields) {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, withOperation);
+    } else if (simpleKeyGenFieldsOpt.isPresent()) {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
+    } else {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, withOperation, partitionNameOp);
+    }
+  }
+
+  @Override
+  public HoodieRecord transform(Schema schema, Properties prop, boolean useKeygen) {

Review Comment:
   What's this method needed for?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -189,8 +189,13 @@ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
       }
     }
     // Put the DELETE record
-    records.put(key, SpillableMapUtils.generateEmptyPayload(key,
-        deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue(), getPayloadClassFQN()));
+    if (recordType == HoodieRecordType.AVRO) {

Review Comment:
   Why are we handling Avro differently here? There are only a few places where we can have this bifurcation (reader, writer, merger) but we should def not have it here



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -115,13 +117,13 @@ protected byte[] serializeRecords(List<HoodieRecord> records) throws IOException
     output.writeInt(records.size());
 
     // 3. Write the records
-    for (HoodieRecord s : records) {
+    for (HoodieRecord<?> s : records) {
       ByteArrayOutputStream temp = new ByteArrayOutputStream();
       BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(temp, encoderCache.get());
       encoderCache.set(encoder);
       try {
         // Encode the record into bytes
-        IndexedRecord data = (IndexedRecord) s.toIndexedRecord(schema, new Properties()).get();
+        IndexedRecord data = s.toIndexedRecord(schema, new Properties()).get();

Review Comment:
   See my comment above regarding deprecation of `toIndexedRecord`: we shouldn't be using `toIndexedRecord` here -- instead we should cast the record to `HoodieAvroRecord` and access the payload directly



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, structType, writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, structType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, CatalystTypeConverters.convertToCatalyst(newValue));
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {
+    Arrays.stream(HoodieMetadataField.values()).forEach(metadataField -> {
+      String value = metadataValues.get(metadataField);
+      if (value != null) {
+        data.update(recordSchema.getField(metadataField.getFieldName()).pos(), CatalystTypeConverters.convertToCatalyst(value));
+      }
+    });
+    return this;
+  }
+
+  @Override
+  public HoodieRecord expansion(Schema schema, Properties prop, String payloadClass,
+      String preCombineField,
+      Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation,
+      Option<String> partitionNameOp,
+      Option<Boolean> populateMetaFieldsOp) {
+    boolean populateMetaFields = populateMetaFieldsOp.orElse(false);
+    if (populateMetaFields) {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, withOperation);
+    } else if (simpleKeyGenFieldsOpt.isPresent()) {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, simpleKeyGenFieldsOpt.get(), withOperation, Option.empty());
+    } else {
+      return HoodieSparkRecordUtils.convertToHoodieSparkRecord(structType, data, preCombineField, withOperation, partitionNameOp);
+    }
+  }
+
+  @Override
+  public HoodieRecord transform(Schema schema, Properties prop, boolean useKeygen) {
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+    String key;
+    String partition;
+    if (useKeygen && !Boolean.parseBoolean(prop.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
+      try {
+        SparkKeyGeneratorInterface keyGeneratorOpt = (SparkKeyGeneratorInterface) HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(prop));
+        key = keyGeneratorOpt.getRecordKey(data, structType);
+        partition = keyGeneratorOpt.getPartitionPath(data, structType);
+      } catch (IOException e) {
+        throw new HoodieException("Only SparkKeyGeneratorInterface are supported when meta columns are disabled ", e);
+      }
+    } else {
+      key = data.get(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal(), StringType).toString();
+      partition = data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(), StringType).toString();
+    }
+    this.key = new HoodieKey(key, partition);
+
+    return this;
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    return Option.empty();
+  }
+
+  @Override
+  public boolean isPresent(Schema schema, Properties prop) throws IOException {
+    if (null == data) {
+      return false;
+    }
+    if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+      return true;
+    }
+    Object deleteMarker = data.get(schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+    return !(deleteMarker instanceof Boolean && (boolean) deleteMarker);
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+    // TODO SENTINEL should refactor SENTINEL without Avro(GenericRecord)
+    if (data != null && data.equals(SENTINEL)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Option<IndexedRecord> toIndexedRecord(Schema schema, Properties prop) throws IOException {

Review Comment:
   This should be a transitory API that we remove at the completion of the RFC-46. Let's mark it as deprecated 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);

Review Comment:
   We should only be unpacking the rows and extracting `orderingVal` only when we do merging. We should avoid storing this field in the record itself



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();

Review Comment:
   This is not necessary: you can just do `structType.fieldIndex(name)`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {

Review Comment:
   Why do we need 5 rewriting method? 1 should be enough, right?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {

Review Comment:
   We should not be passing current Record's schema



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieInternalRowUtils.scala:
##########
@@ -188,29 +211,82 @@ object HoodieInternalRowUtils {
     schemaMap.get(schema)
   }
 
+  def getProjection(from: Schema, to: Schema): Projection = {
+    getCachedUnsafeProjection(getCachedSchema(from), getCachedSchema(to))
+  }
+
   private def getCachedProjection(from: StructType, to: StructType): Projection = {
     val schemaPair = (from, to)
     if (!projectionMap.contains(schemaPair)) {
       projectionMap.synchronized {
         if (!projectionMap.contains(schemaPair)) {
-          val projection = generateMutableProjection(from, to)
+          val utilsClazz = ReflectionUtils.getClass("org.apache.hudi.HoodieSparkProjectionUtils")

Review Comment:
   We can address this later, but we should avoid reflection at all costs -- instead we can just move this class into spark-client (in fact it's already moved on master actually)



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {

Review Comment:
   A few notes
   1. All Records should hold its own schema 
   2. I don't think we're looking to support merging records of different types (say Avro w/ Spark), 
   therefore we should be able to assume other record is also Spark Record. 
   
   That means there's no need to pass schemas as params here, we should extract it from the record itself.
   



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.commmon.model;
+
+import org.apache.hudi.HoodieInternalRowUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
+import org.apache.hudi.util.HoodieSparkRecordUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.CatalystTypeConverters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import scala.Tuple2;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Spark Engine-specific Implementations of `HoodieRecord`.
+ */
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+
+  // IndexedRecord hold its schema, InternalRow should also hold its schema
+  private final StructType structType;
+
+  public HoodieSparkRecord(InternalRow data, StructType schema) {
+    super(null, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(InternalRow data, StructType schema, Comparable orderingVal) {
+    super(null, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema) {
+    super(key, data);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, Comparable orderingVal) {
+    super(key, data, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema, HoodieOperation operation, Comparable orderingVal) {
+    super(key, data, operation, orderingVal);
+    this.structType = schema;
+  }
+
+  public HoodieSparkRecord(HoodieSparkRecord record) {
+    super(record);
+    this.structType = record.structType;
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance() {
+    return new HoodieSparkRecord(this);
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key, HoodieOperation op) {
+    return new HoodieSparkRecord(key, data, structType, op, getOrderingValue());
+  }
+
+  @Override
+  public HoodieRecord<InternalRow> newInstance(HoodieKey key) {
+    return new HoodieSparkRecord(key, data, structType, getOrderingValue());
+  }
+
+  @Override
+  public void deflate() {
+  }
+
+  @Override
+  public String getRecordKey(Option<BaseKeyGenerator> keyGeneratorOpt) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    return keyGeneratorOpt.isPresent() ? ((SparkKeyGeneratorInterface) keyGeneratorOpt.get()).getRecordKey(data, structType) : data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
+  }
+
+  @Override
+  public String getRecordKey(String keyFieldName) {
+    if (key != null) {
+      return getRecordKey();
+    }
+    Tuple2<StructField, Object> tuple2 = HoodieInternalRowUtils.getCachedSchemaPosMap(structType).get(keyFieldName).get();
+    DataType dataType = tuple2._1.dataType();
+    int pos = (Integer) tuple2._2;
+    return data.get(pos, dataType).toString();
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.SPARK;
+  }
+
+  @Override
+  public Object getRecordColumnValues(String[] columns, Schema schema, boolean consistentLogicalTimestampEnabled) {
+    return HoodieSparkRecordUtils.getRecordColumnValues(data, columns, structType, consistentLogicalTimestampEnabled);
+  }
+
+  @Override
+  public HoodieRecord mergeWith(Schema schema, HoodieRecord other, Schema otherSchema, Schema writerSchema) throws IOException {
+    StructType otherStructType = HoodieInternalRowUtils.getCachedSchema(otherSchema);
+    StructType writerStructType = HoodieInternalRowUtils.getCachedSchema(writerSchema);
+    InternalRow mergeRow = HoodieInternalRowUtils.stitchRecords(data, structType, (InternalRow) other.getData(), otherStructType, writerStructType);
+    return new HoodieSparkRecord(getKey(), mergeRow, writerStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Schema targetSchema) throws IOException {
+    StructType targetStructType = HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, targetStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, targetStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, writeSchemaWithMetaFieldsStructType, new HashMap<>())
+        : HoodieInternalRowUtils.rewriteRecord(data, structType, writeSchemaWithMetaFieldsStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithMetadata(Schema recordSchema, Properties prop, boolean schemaOnReadEnabled, Schema writeSchemaWithMetaFields, String fileName) throws IOException {
+    StructType writeSchemaWithMetaFieldsStructType = HoodieInternalRowUtils.getCachedSchema(writeSchemaWithMetaFields);
+    InternalRow rewriteRow = schemaOnReadEnabled ? HoodieInternalRowUtils.rewriteEvolutionRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName)
+        : HoodieInternalRowUtils.rewriteRecordWithMetadata(data, structType, writeSchemaWithMetaFieldsStructType, fileName);
+    return new HoodieSparkRecord(getKey(), rewriteRow, writeSchemaWithMetaFieldsStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema, Map<String, String> renameCols) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType, newStructType, renameCols);
+    return new HoodieSparkRecord(getKey(), rewriteRow, newStructType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties prop, Schema newSchema) throws IOException {
+    StructType newStructType = HoodieInternalRowUtils.getCachedSchema(newSchema);
+    InternalRow rewriteRow = HoodieInternalRowUtils.rewriteRecord(data, structType, newStructType);
+    return new HoodieSparkRecord(getKey(), rewriteRow, structType, getOperation());
+  }
+
+  @Override
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+    data.update(pos, CatalystTypeConverters.convertToCatalyst(newValue));
+    return this;
+  }
+
+  @Override
+  public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<HoodieMetadataField, String> metadataValues) throws IOException {

Review Comment:
   There's no need for both `overrideMetadataFieldValue` and `addMetadataValues`, 1 should be enough



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java:
##########
@@ -155,13 +165,52 @@ public HoodieRecord addMetadataValues(Schema recordSchema, Properties prop, Map<
   }
 
   @Override
-  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties prop, int pos, String newValue) throws IOException {
+  public HoodieRecord overrideMetadataFieldValue(Schema recordSchema, Properties props, int pos, String newValue) throws IOException {
     data.put(pos, newValue);
     return this;
   }
 
   @Override
-  public boolean shouldIgnore(Schema schema, Properties prop) throws IOException {
+  public HoodieRecord expansion(
+      Schema schema,
+      Properties props,
+      String payloadClass,
+      String preCombineField,
+      Option<Pair<String, String>> simpleKeyGenFieldsOpt,
+      Boolean withOperation,
+      Option<String> partitionNameOp,
+      Option<Boolean> populateMetaFieldsOp) {
+    return HoodieAvroUtils.createHoodieRecordFromAvro(data, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFieldsOp);
+  }
+
+  @Override
+  public HoodieRecord transform(Schema schema, Properties props, boolean useKeyGen) {
+    GenericRecord record = (GenericRecord) data;
+    String key;
+    String partition;
+    if (useKeyGen && !Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(), POPULATE_META_FIELDS.defaultValue().toString()).toString())) {
+      try {
+        Class<?> clazz = ReflectionUtils.getClass("org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory");

Review Comment:
   This is not going to work -- we can't load key generator using reflection for every record



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -91,13 +87,13 @@ public abstract class AbstractHoodieLogRecordReader {
   // Latest valid instant time
   // Log-Blocks belonging to inflight delta-instants are filtered-out using this high-watermark.
   private final String latestInstantTime;
-  private final HoodieTableMetaClient hoodieTableMetaClient;
+  protected final HoodieTableMetaClient hoodieTableMetaClient;
   // Merge strategy to use when combining records from log
   private final String payloadClassFQN;
   // preCombine field
   private final String preCombineField;
   // Stateless component for merging records
-  private final String mergeClassFQN;
+  private final String mergeClass;

Review Comment:
   Folks, let's make sure we're not losing the context of the previous discussion: i've highlighted on a previous PR (step 2) that `HoodieMerge` is not a good name for this abstraction, and this had not been addressed.
   
   Let's make sure we follow-up on it here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java:
##########
@@ -84,8 +86,14 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute(String instantTime,
         dedupedKeys = keys.repartition(parallelism);
       }
 
-      HoodieData<HoodieRecord<T>> dedupedRecords =
-          dedupedKeys.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
+      HoodieData dedupedRecords;
+      if (config.getRecordType() == HoodieRecordType.AVRO) {

Review Comment:
   Same comment as below: there should be no need for this bifurcation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org