You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/08 01:09:55 UTC

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6745: Fix comment in RFC46

alexeykudinkin commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r987470813


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -461,6 +461,18 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
   }
 
   protected def getTableState: HoodieTableState = {
+    val mergerImpls = (if (optParams.contains(HoodieWriteConfig.MERGER_IMPLS.key())) {

Review Comment:
   @wzx140 let's abstract this behind common utility in the config



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -870,18 +869,17 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.debug(s"Use ${HoodieRecord.HoodieRecordType.SPARK}")

Review Comment:
   Let's lift this log before the match so that we can tell if it's Avro or Spark



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -151,11 +155,10 @@ protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws IOExce
 
       HoodieRecord<T> oldRecord = records.get(key);
       T oldValue = oldRecord.getData();
-      T combinedValue = ((HoodieRecord<T>) recordMerger.merge(oldRecord, hoodieRecord, readerSchema, this.getPayloadProps()).get()).getData();

Review Comment:
   Was `getPayloadProps` change intentional? Just calling it out to make sure we can validate 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -253,21 +253,21 @@ private Option<HoodieRecord> prepareRecord(HoodieRecord<T> hoodieRecord) {
   }
 
   private HoodieRecord populateMetadataFields(HoodieRecord<T> hoodieRecord, Schema schema, Properties prop) throws IOException {
-    Map<String, String> metadataValues = new HashMap<>();
-    String seqId =
-        HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+    MetadataValues metadataValues = new MetadataValues();
     if (config.populateMetaFields()) {
-      metadataValues.put(HoodieRecord.HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName(), fileId);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.getFieldName(), partitionPath);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName(), hoodieRecord.getRecordKey());
-      metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName(), instantTime);
-      metadataValues.put(HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName(), seqId);
+      String seqId =
+          HoodieRecord.generateSequenceId(instantTime, getPartitionId(), RECORD_COUNTER.getAndIncrement());
+      metadataValues.setFileName(fileId);
+      metadataValues.setPartitionPath(partitionPath);
+      metadataValues.setRecordKey(hoodieRecord.getRecordKey());
+      metadataValues.setCommitTime(instantTime);
+      metadataValues.setCommitSeqno(seqId);
     }
     if (config.allowOperationMetadataField()) {
-      metadataValues.put(HoodieRecord.HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName(), hoodieRecord.getOperation().getName());
+      metadataValues.setOperation(hoodieRecord.getOperation().getName());
     }
 
-    return hoodieRecord.updateValues(schema, prop, metadataValues);
+    return hoodieRecord.updateMetadataValues(schema, prop, metadataValues);

Review Comment:
   Why do we need to update meta values if we're not populating them?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -295,12 +295,11 @@ object HoodieSparkSqlWriter {
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&

Review Comment:
   I think `HoodieTableType.MERGE_ON_READ` would be preferred



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala:
##########
@@ -240,17 +236,22 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
   private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): Option[InternalRow] = {
     // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
     //       on the record from the Delta Log
+    val curRecord = recordMerger.getRecordType match {
+      case HoodieRecordType.SPARK =>
+        new HoodieSparkRecord(curRow, baseFileReader.schema)
+      case _ =>
+        new HoodieAvroIndexedRecord(serialize(curRow))
+    }
     recordMerger.getRecordType match {
       case HoodieRecordType.SPARK =>
-        val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
-        toScalaOption(recordMerger.merge(curRecord, newRecord, logFileReaderAvroSchema, payloadProps))
+        toScalaOption(recordMerger.merge(curRecord, baseFileReaderAvroSchema, newRecord, logFileReaderAvroSchema, payloadProps))
           .map(r => {
-            val projection = HoodieInternalRowUtils.getCachedUnsafeProjection(r.asInstanceOf[HoodieSparkRecord].getStructType, structTypeSchema)
+            val schema = if (r == curRecord) baseFileReader.schema else logFileReaderStructType

Review Comment:
   Please check my comment above regarding the need to return schema form the merger



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -870,18 +869,17 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.debug(s"Use ${HoodieRecord.HoodieRecordType.SPARK}")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, config.allowOperationMetadataField)

Review Comment:
   Why do we need meta-fields in here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala:
##########
@@ -240,17 +236,22 @@ class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
   private def merge(curRow: InternalRow, newRecord: HoodieRecord[_]): Option[InternalRow] = {
     // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
     //       on the record from the Delta Log
+    val curRecord = recordMerger.getRecordType match {

Review Comment:
   I don't think it makes sense to extract this one (we have identical matches right on top of each other)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -295,12 +295,11 @@ object HoodieSparkSqlWriter {
               tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)
             )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == HoodieTableType.MERGE_ON_READ &&
-              writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == HoodieRecordType.SPARK && tableType == MOR_TABLE_TYPE_OPT_VAL && writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} only support parquet log.")
             }
             // Create a HoodieWriteClient & issue the write.
-            val hoodieAllIncomingRecords = createHoodieRecordRdd(df, writeConfig, parameters, writerSchema)
+            val hoodieAllIncomingRecords = createHoodieRecordRdd(sparkContext, df, writeConfig, parameters, writerSchema)

Review Comment:
   We can get `SparkContext` from `df.sparkSession.sparkContext`



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/HoodieSparkRecordUtils.java:
##########
@@ -91,21 +65,20 @@ private static Option<String> getNullableValAsString(StructType structType, Inte
    * @param structType  {@link StructType} instance.
    * @return Column value if a single column, or concatenated String values by comma.
    */
-  public static Object getRecordColumnValues(InternalRow row,
+  public static ComparableList getRecordColumnValues(InternalRow row,
       String[] columns,
       StructType structType, boolean consistentLogicalTimestampEnabled) {
-    if (columns.length == 1) {
-      NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-      return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
-    } else {
-      // TODO this is inefficient, instead we can simply return array of Comparable
-      StringBuilder sb = new StringBuilder();
-      for (String col : columns) {
-        // TODO support consistentLogicalTimestampEnabled
-        NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, columns[0]);
-        return HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+    List<Comparable> list = new LinkedList<>();
+    for (String column : columns) {
+      NestedFieldPath posList = HoodieInternalRowUtils.getCachedPosList(structType, column);
+      Object value = HoodieUnsafeRowUtils.getNestedInternalRowValue(row, posList);
+      DataType dataType = posList.parts()[posList.parts().length - 1]._2.dataType();
+      if (value instanceof InternalRow | value instanceof MapData | value instanceof ArrayData

Review Comment:
   I see what you're trying to do here, a few notes:
   
    - We can't wrap around objects in this method -- this method should be returning field values as they are (since we might be using it for other purposes for ex, in merging)
    - I don't think we need to provide `Comparable` semantic over objects that aren't really comparable (like `MapData`). If user tries to compare on Maps we should just error out instead of trying to interpret it in some way



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieConcatHandle.java:
##########
@@ -93,8 +93,8 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa
    */
   @Override
   public void write(HoodieRecord oldRecord) {
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;

Review Comment:
   Why does this condition change? 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -267,9 +268,8 @@ protected void init(String fileId, Iterator<HoodieRecord<T>> newRecordsItr) {
         + ((ExternalSpillableMap) keyToNewRecords).getSizeOfFileOnDiskInBytes());
   }
 
-  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord oldRecord, Option<HoodieRecord> combineRecordOp) throws IOException {
+  protected boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord, HoodieRecord<T> oldRecord, Option<HoodieRecord> combineRecordOp, Schema combineRecordSchema) throws IOException {

Review Comment:
   We should just call this `schema` or `writerSchema` to avoid confusion



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -335,21 +334,26 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<HoodieRecord>
    * Go through an old record. Here if we detect a newer version shows up, we write the new one to the file.
    */
   public void write(HoodieRecord<T> oldRecord) {
-    Schema schema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
+    Schema oldSchema = withMetaFields ? tableSchemaWithMetaFields : tableSchema;
+    Schema newSchema = useWriterSchemaForCompaction ? tableSchemaWithMetaFields : tableSchema;
     boolean copyOldRecord = true;
-    String key = oldRecord.getRecordKey(keyGeneratorOpt);
+    String key = oldRecord.getRecordKey(oldSchema, keyGeneratorOpt);
     TypedProperties props = config.getPayloadConfig().getProps();
     if (keyToNewRecords.containsKey(key)) {
       // If we have duplicate records that we are updating, then the hoodie record will be deflated after
       // writing the first record. So make a copy of the record to be merged
       HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key).newInstance();
       try {
-        Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord, hoodieRecord, schema, props);
+        Option<HoodieRecord> combinedRecord = recordMerger.merge(oldRecord, oldSchema, hoodieRecord, newSchema, props);
+        Schema combineRecordSchema = oldSchema;
+        if (combinedRecord.isPresent() && combinedRecord.get() == hoodieRecord) {

Review Comment:
   We should actually not assume which schema was used but instead make Merger return record and the new schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org