You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/21 23:03:41 UTC

[GitHub] [hudi] yihua commented on a diff in pull request #5523: [HUDI-4039][Stacked on 5470] Make sure all builtin `KeyGenerator`s properly implement Spark specific APIs

yihua commented on code in PR #5523:
URL: https://github.com/apache/hudi/pull/5523#discussion_r927107467


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java:
##########
@@ -50,22 +51,27 @@ public class HoodieInternalRow extends InternalRow {
 
   /**
    * Collection of meta-fields as defined by {@link HoodieRecord#HOODIE_META_COLUMNS}
+   *
+   * NOTE: {@code HoodieInternalRow} *always* overlays its own meta-fields even in case
+   *       when source row also contains them, to make sure these fields are mutable and
+   *       can be updated (for ex, {@link UnsafeRow} doesn't support mutations due to
+   *       its memory layout, as it persists field offsets)
    */
   private final UTF8String[] metaFields;
-  private final InternalRow row;
+  private final InternalRow sourceRow;
 
   /**
-   * Specifies whether source {@link #row} contains meta-fields
+   * Specifies whether source {@link #sourceRow} contains meta-fields
    */
-  private final boolean containsMetaFields;
+  private final boolean sourceContainsMetaFields;

Review Comment:
   Are these renaming necessary in this PR?  Next time, I would prefer to have them in a separate PR to avoid reviewing overhead.



##########
hudi-kafka-connect/src/main/java/org/apache/hudi/connect/utils/KafkaConnectUtils.java:
##########
@@ -174,7 +174,9 @@ public static Configuration getDefaultHadoopConf(KafkaConnectConfigs connectConf
    * @return Returns the record key columns separated by comma.
    */
   public static String getRecordKeyColumns(KeyGenerator keyGenerator) {
-    return String.join(",", keyGenerator.getRecordKeyFieldNames());
+    return keyGenerator.getRecordKeyFieldNames().stream()
+        .map(HoodieAvroUtils::getRootLevelFieldName)

Review Comment:
   What is the reason to get root-level fields for Kafka Connect?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    // TODO avoid conversion to avro
-    //      since converterFn is transient this will be repeatedly initialized over and over again
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
-    }
-    return getKey(converterFn.apply(row)).getRecordKey();
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
   }
 
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    try {
-      // TODO fix
-      buildFieldSchemaInfoIfNeeded(schema);
-      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, false);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {
+        if (this.rowAccessor == null) {
+          this.rowAccessor = new SparkRowAccessor(schema);
+        }
+      }
     }
   }
+
   /**
-   * Fetch partition path from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which partition path is requested
-   * @return the partition path of interest from {@link Row}.
+   * NOTE: This method has to stay final (so that it's easier for JIT compiler to apply certain
+   *       optimizations, like inlining)
    */
+  protected final String combinePartitionPath(Object... partitionPathParts) {

Review Comment:
   Should these util methods be static?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieTimer.java:
##########
@@ -79,4 +83,12 @@ public long endTimer() {
     }
     return timeInfoDeque.pop().stop();
   }
+
+  public static HoodieTimer start() {

Review Comment:
   +1



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java:
##########
@@ -42,32 +41,34 @@ public class HoodieInternalRowFileWriterFactory {
    * Factory method to assist in instantiating an instance of {@link HoodieInternalRowFileWriter}.
    * @param path path of the RowFileWriter.
    * @param hoodieTable instance of {@link HoodieTable} in use.
-   * @param config instance of {@link HoodieWriteConfig} to use.
+   * @param writeConfig instance of {@link HoodieWriteConfig} to use.
    * @param schema schema of the dataset in use.
    * @return the instantiated {@link HoodieInternalRowFileWriter}.
    * @throws IOException if format is not supported or if any exception during instantiating the RowFileWriter.
    *
    */
-  public static HoodieInternalRowFileWriter getInternalRowFileWriter(
-      Path path, HoodieTable hoodieTable, HoodieWriteConfig config, StructType schema)
+  public static HoodieInternalRowFileWriter getInternalRowFileWriter(Path path,
+                                                                     HoodieTable hoodieTable,
+                                                                     HoodieWriteConfig writeConfig,
+                                                                     StructType schema)
       throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
     if (PARQUET.getFileExtension().equals(extension)) {
-      return newParquetInternalRowFileWriter(path, config, schema, hoodieTable);
+      return newParquetInternalRowFileWriter(path, hoodieTable, writeConfig, schema, tryInstantiateBloomFilter(writeConfig));
     }
     throw new UnsupportedOperationException(extension + " format not supported yet.");
   }
 
-  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(
-      Path path, HoodieWriteConfig writeConfig, StructType structType, HoodieTable table)
+  private static HoodieInternalRowFileWriter newParquetInternalRowFileWriter(Path path,
+                                                                             HoodieTable table,
+                                                                             HoodieWriteConfig writeConfig,
+                                                                             StructType structType,
+                                                                             Option<BloomFilter> bloomFilterOpt
+  )
       throws IOException {
-    BloomFilter filter = BloomFilterFactory.createBloomFilter(
-            writeConfig.getBloomFilterNumEntries(),
-            writeConfig.getBloomFilterFPP(),
-            writeConfig.getDynamicBloomFilterMaxNumEntries(),
-            writeConfig.getBloomFilterType());

Review Comment:
   Why not try to instantiate the bloom filter here instead since write config is passed in, to avoid unnecessary changes?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SparkKeyGeneratorInterface.java:
##########
@@ -18,20 +18,65 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
 
 /**
- * Spark key generator interface.
+ * Spark-specific {@link KeyGenerator} interface extension allowing implementation to
+ * specifically implement record-key, partition-path generation w/o the need for (expensive)
+ * conversion from Spark internal representation (for ex, to Avro)
  */
 public interface SparkKeyGeneratorInterface extends KeyGeneratorInterface {
 
+  /**
+   * Extracts record key from Spark's {@link Row}
+   *
+   * @param row instance of {@link Row} from which record-key is extracted
+   * @return record's (primary) key
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   String getRecordKey(Row row);
 
-  String getRecordKey(InternalRow row, StructType schema);
+  /**
+   * Extracts record key from Spark's {@link InternalRow}
+   *
+   * NOTE: Difference b/w {@link Row} and {@link InternalRow} is that {@link InternalRow} could
+   *       internally hold just a binary representation of the data, while {@link Row} has it
+   *       deserialized into JVM-native representation (like {@code Integer}, {@code Long},
+   *       {@code String}, etc)
+   *
+   * @param row instance of {@link InternalRow} from which record-key is extracted
+   * @param schema schema {@link InternalRow} is adhering to
+   * @return record-key as instance of {@link UTF8String}
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  UTF8String getRecordKey(InternalRow row, StructType schema);

Review Comment:
   We need to call out breaking public API changes in release notes and docs (in this case the changes are unavoidable).  @alexeykudinkin could you create a Jira ticket for that?  cc @codope @xushiyan 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)

Review Comment:
   Why removing the annotation?  already reflected in the interface?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/BuiltinKeyGenerator.java:
##########
@@ -19,132 +19,560 @@
 package org.apache.hudi.keygen;
 
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.AvroConversionUtils;
-import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.client.utils.SparkRowSerDe;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.HoodieUnsafeRowUtils;
+import org.apache.spark.sql.HoodieUnsafeRowUtils$;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
 import scala.Function1;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.apache.hudi.common.util.CollectionUtils.tail;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_PARTITION_PATH_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR;
+import static org.apache.hudi.keygen.KeyGenUtils.EMPTY_RECORDKEY_PLACEHOLDER;
+import static org.apache.hudi.keygen.KeyGenUtils.HUDI_DEFAULT_PARTITION_PATH;
+import static org.apache.hudi.keygen.KeyGenUtils.NULL_RECORDKEY_PLACEHOLDER;
 
 /**
- * Base class for the built-in key generators. Contains methods structured for
- * code reuse amongst them.
+ * Base class for all built-in key generators.
+ *
+ * NOTE: By default it implements all the methods of {@link SparkKeyGeneratorInterface}, which
+ *       by default however fallback to Avro implementation. For maximum performance (to avoid
+ *       conversion from Spark's internal data-types to Avro) you should override these methods
+ *       in your implementation.
+ *
+ * TODO rename to AvroFallbackBaseKeyGenerator
  */
+@ThreadSafe
 public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements SparkKeyGeneratorInterface {
 
-  private static final String STRUCT_NAME = "hoodieRowTopLevelField";
-  private static final String NAMESPACE = "hoodieRow";
-  private Function1<Row, GenericRecord> converterFn = null;
-  private final AtomicBoolean validatePartitionFields = new AtomicBoolean(false);
-  protected StructType structType;
+  private static final Logger LOG = LogManager.getLogger(BuiltinKeyGenerator.class);
+
+  private static final String COMPOSITE_KEY_FIELD_VALUE_INFIX = ":";
+
+  protected static final UTF8String HUDI_DEFAULT_PARTITION_PATH_UTF8 = UTF8String.fromString(HUDI_DEFAULT_PARTITION_PATH);
+  protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
+  protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);
+
 
-  protected Map<String, Pair<List<Integer>, DataType>> recordKeySchemaInfo = new HashMap<>();
-  protected Map<String, Pair<List<Integer>, DataType>> partitionPathSchemaInfo = new HashMap<>();
+  protected transient volatile SparkRowConverter rowConverter;
+  protected transient volatile SparkRowAccessor rowAccessor;
 
   protected BuiltinKeyGenerator(TypedProperties config) {
     super(config);
   }
 
-  /**
-   * Fetch record key from {@link Row}.
-   *
-   * @param row instance of {@link Row} from which record key is requested.
-   * @return the record key of interest from {@link Row}.
-   */
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public String getRecordKey(Row row) {
-    // TODO avoid conversion to avro
-    //      since converterFn is transient this will be repeatedly initialized over and over again
-    if (null == converterFn) {
-      converterFn = AvroConversionUtils.createConverterToAvro(row.schema(), STRUCT_NAME, NAMESPACE);
-    }
-    return getKey(converterFn.apply(row)).getRecordKey();
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getRecordKey(rowConverter.convertToAvro(row));
   }
 
   @Override
-  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  public String getRecordKey(InternalRow internalRow, StructType schema) {
-    try {
-      // TODO fix
-      buildFieldSchemaInfoIfNeeded(schema);
-      return RowKeyGeneratorHelper.getRecordKeyFromInternalRow(internalRow, getRecordKeyFields(), recordKeySchemaInfo, false);
-    } catch (Exception e) {
-      throw new HoodieException("Conversion of InternalRow to Row failed with exception", e);
+  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return UTF8String.fromString(getRecordKey(rowConverter.convertToAvro(internalRow)));
+  }
+
+  @Override
+  public String getPartitionPath(Row row) {
+    tryInitRowConverter(row.schema());
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    return getPartitionPath(rowConverter.convertToAvro(row));
+  }
+
+  @Override
+  public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
+    tryInitRowConverter(schema);
+    // NOTE: This implementation has considerable computational overhead and has to be overridden
+    //       to provide for optimal performance on Spark. This implementation provided exclusively
+    //       for compatibility reasons.
+    GenericRecord avroRecord = rowConverter.convertToAvro(internalRow);
+    return UTF8String.fromString(getPartitionPath(avroRecord));
+  }
+
+  protected void tryInitRowAccessor(StructType schema) {
+    if (this.rowAccessor == null) {
+      synchronized (this) {

Review Comment:
   Should the order be the opposite, i.e., first lock then check and assign?  Otherwise, in a slight chance, the `rowAccessor` is going to be assigned twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org