You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/03/22 14:33:21 UTC

[GitHub] [hudi] xushiyan commented on a change in pull request #4888: [HUDI-3396] Refactoring `MergeOnReadRDD` to avoid duplication, fetch only projected columns

xushiyan commented on a change in pull request #4888:
URL: https://github.com/apache/hudi/pull/4888#discussion_r832116327



##########
File path: hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -206,17 +206,6 @@ object AvroConversionUtils {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
 
-  def buildAvroRecordBySchema(record: IndexedRecord,

Review comment:
       this method not used? would be helpful to have a comment here to clarify

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     )
     .getOrElse(new Properties())
 
+  private val whitelistedPayloadClasses: Set[String] = Seq(
+    classOf[OverwriteWithLatestAvroPayload]
+  ).map(_.getName).toSet
+
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+        requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)
+
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        logFileIterator(logFileOnlySplit, getConfig)
-      case skipMergeSplit if skipMergeSplit.mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
-        skipMergeFileIterator(skipMergeSplit, requiredSchemaFileReader(skipMergeSplit.dataFile.get), getConfig)
-      case payloadCombineSplit
-        if payloadCombineSplit.mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
-        payloadCombineFileIterator(payloadCombineSplit, fullSchemaFileReader(payloadCombineSplit.dataFile.get),
-          getConfig)
+        new LogFileIterator(logFileOnlySplit, getConfig)
+
+      case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+        val baseFileIterator = requiredSchemaFileReader.apply(split.dataFile.get)
+        new SkipMergeIterator(split, baseFileIterator, getConfig)
+
+      case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+        val (baseFileIterator, schema) = readBaseFile(split)
+        new RecordMergingFileIterator(split, baseFileIterator, schema, getConfig)
+
       case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
         s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
         s"log paths: ${mergeOnReadPartition.split.logFiles.toString}" +
-        s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+        s"hoodie table path: ${tableState.tablePath}" +
         s"spark partition Index: ${mergeOnReadPartition.index}" +
-        s"merge type: ${mergeOnReadPartition.split.mergeType}")
+        s"merge type: ${mergeType}")
     }
+
     if (iter.isInstanceOf[Closeable]) {
       // register a callback to close logScanner which will be executed on task completion.
       // when tasks finished, this method will be called, and release resources.
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close()))
     }
+
     iter
   }
 
+  private def readBaseFile(split: HoodieMergeOnReadFileSplit): (Iterator[InternalRow], HoodieTableSchema) = {
+    // NOTE: This is an optimization making sure that even for MOR tables we fetch absolute minimum
+    //       of the stored data possible, while still properly executing corresponding relation's semantic
+    //       and meet the query's requirements.
+    //
+    //       Here we assume that iff queried table
+    //          a) It does use one of the standard (and whitelisted) Record Payload classes
+    //       then we can avoid reading and parsing the records w/ _full_ schema, and instead only
+    //       rely on projected one, nevertheless being able to perform merging correctly
+    if (!whitelistedPayloadClasses.contains(tableState.recordPayloadClassName))
+      (fullSchemaFileReader(split.dataFile.get), tableSchema)

Review comment:
       looks better with `fullSchema` to distinguish with `requiredSchema`
   
   ```suggestion
         (fullSchemaFileReader(split.dataFile.get), fullSchema)
   ```

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records read from the Delta Log file
+    //       which always reads records in full schema (never projected, due to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
+    //       going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: Iterator[InternalRow],
-                                         config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: Iterator[InternalRow],
+                                          baseFileReaderSchema: HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we rely on the caller
+    //       to correspondingly set the scheme of the expected output of base-file reader
+    private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+

Review comment:
       maybe make `AvroDeserializerSupport` provide a serializer and call `AvroSerDeSupport` ?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -380,4 +422,17 @@ private object HoodieMergeOnReadRDD {
       case (nullable, _) => nullable
     }
   }
+
+  trait AvroDeserializerSupport extends SparkAdapterSupport {

Review comment:
       this looks like a generic trait.. maybe place in a more common class?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -79,13 +83,30 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected lazy val basePath: String = metaClient.getBasePath
 
-  // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one
-  // NOTE: This is historical behavior which is preserved as is
+  // NOTE: Record key-field is assumed singular here due to the either of
+  //          - In case Hudi's meta fields are enabled: record key will be pre-materialized (stored) as part
+  //          of the record's payload (as part of the Hudi's metadata)
+  //          - In case Hudi's meta fields are disabled (virtual keys): in that case record has to bear _single field_
+  //          identified as its (unique) primary key w/in its payload (this is a limitation of [[SimpleKeyGenerator]],
+  //          which is the only [[KeyGenerator]] permitted for virtual-keys payloads)
   protected lazy val recordKeyField: String =
-    if (tableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD
-    else tableConfig.getRecordKeyFieldProp
+    if (tableConfig.populateMetaFields()) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = tableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }
 
-  protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty
+  protected lazy val preCombineFieldOpt: Option[String] =
+    Option(tableConfig.getPreCombineField)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key)) match {
+      // NOTE: This is required to compensate for cases when empty string is used to stub
+      //       property value to avoid it being set with the default value
+      // TODO(HUDI-3456) cleanup
+      case Some(f) if !StringUtils.isNullOrEmpty(f) => Some(f)

Review comment:
       have seen this negation many other places. It would be more readable to add the other util .
   
   ```suggestion
         case Some(f) if StringUtils.nonEmpty(f) => Some(f)
   ```

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     )
     .getOrElse(new Properties())
 
+  private val whitelistedPayloadClasses: Set[String] = Seq(
+    classOf[OverwriteWithLatestAvroPayload]

Review comment:
       should this include `DefaultHoodieRecordPayload` ?

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -70,34 +73,59 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     )
     .getOrElse(new Properties())
 
+  private val whitelistedPayloadClasses: Set[String] = Seq(
+    classOf[OverwriteWithLatestAvroPayload]
+  ).map(_.getName).toSet
+
   override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
     val iter = mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
-        requiredSchemaFileReader(dataFileOnlySplit.dataFile.get)
+        requiredSchemaFileReader.apply(dataFileOnlySplit.dataFile.get)

Review comment:
       let's always call baseFile instead of dataFile, while you're at it

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -108,270 +136,284 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     }
   }
 
-  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
-                              config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      override def hasNext: Boolean = {
-        if (logRecordsKeyIterator.hasNext) {
-          val curAvrokey = logRecordsKeyIterator.next()
-          val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-          if (!curAvroRecord.isPresent) {
-            // delete record found, skipping
-            this.hasNext
-          } else {
-            val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
-              requiredFieldPosition, recordBuilder)
-            val rowOpt = deserializer.deserialize(requiredAvroRecord)
-            recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-            true
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in
+   * Delta Log files (represented as [[InternalRow]]s)
+   */
+  private class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+                                config: Configuration)
+    extends Iterator[InternalRow] with Closeable with AvroDeserializerSupport {
+
+    protected override val requiredAvroSchema: Schema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
+    protected override val requiredStructTypeSchema: StructType = requiredSchema.structTypeSchema
+
+    protected val logFileReaderAvroSchema: Schema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
+
+    protected val recordBuilder: GenericRecordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+    protected var recordToLoad: InternalRow = _
+
+    // TODO validate whether we need to do UnsafeProjection
+    protected val unsafeProjection: UnsafeProjection = UnsafeProjection.create(requiredStructTypeSchema)
+
+    // NOTE: This maps _required_ schema fields onto the _full_ table schema, collecting their "ordinals"
+    //       w/in the record payload. This is required, to project records read from the Delta Log file
+    //       which always reads records in full schema (never projected, due to the fact that DL file might
+    //       be stored in non-columnar formats like Avro, HFile, etc)
+    private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, logFileReaderAvroSchema)
+
+    private var logScanner =
+      HoodieMergeOnReadRDD.scanLog(split.logFiles, getPartitionPath(split), logFileReaderAvroSchema, tableState,
+        maxCompactionMemoryInBytes, config)
+
+    private val logRecords = logScanner.getRecords.asScala
+
+    // NOTE: This iterator iterates over already projected (in required schema) records
+    // NOTE: This have to stay lazy to make sure it's initialized only at the point where it's
+    //       going to be used, since we modify `logRecords` before that and therefore can't do it any earlier
+    protected lazy val logRecordsIterator: Iterator[Option[GenericRecord]] =
+      logRecords.iterator.map {
+        case (_, record) =>
+          val avroRecordOpt = toScalaOption(record.getData.getInsertValue(logFileReaderAvroSchema, payloadProps))
+          avroRecordOpt.map {
+            avroRecord => projectAvroUnsafe(avroRecord, requiredAvroSchema, requiredSchemaFieldOrdinals, recordBuilder)
           }
-        } else {
-          false
-        }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
-    }
+    protected def removeLogRecord(key: String): Option[HoodieRecord[_ <: HoodieRecordPayload[_]]] =
+      logRecords.remove(key)
 
-  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
-                                    baseFileIterator: Iterator[InternalRow],
-                                    config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val deserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          recordToLoad = unsafeProjection(curRow)
-          true
+    override def hasNext: Boolean =
+      logRecordsIterator.hasNext && {
+        val avroRecordOpt = logRecordsIterator.next()
+        if (avroRecordOpt.isEmpty) {
+          // Record has been deleted, skipping
+          this.hasNext
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curAvrokey = logRecordsKeyIterator.next()
-            val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema, payloadProps)
-            if (!curAvroRecord.isPresent) {
-              // delete record found, skipping
-              this.hasNext
-            } else {
-              val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = deserializer.deserialize(requiredAvroRecord)
-              recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            false
-          }
+          recordToLoad = unsafeProjection(deserialize(avroRecordOpt.get))
+          true
         }
       }
 
-      override def next(): InternalRow = {
-        recordToLoad
-      }
+    override final def next(): InternalRow = recordToLoad
 
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
+    override def close(): Unit =
+      if (logScanner != null) {
+        try {
+          logScanner.close()
+        } finally {
+          logScanner = null
         }
       }
+  }
+
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
+   * Base file as well as all of the Delta Log files simply returning concatenation of these streams, while not
+   * performing any combination/merging of the records w/ the same primary keys (ie producing duplicates potentially)
+   */
+  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRow = baseFileIterator.next()
+        recordToLoad = unsafeProjection(curRow)
+        true
+      } else {
+        super[LogFileIterator].hasNext
+      }
     }
+  }
 
-  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
-                                         baseFileIterator: Iterator[InternalRow],
-                                         config: Configuration): Iterator[InternalRow] =
-    new Iterator[InternalRow] with Closeable with SparkAdapterSupport {
-      private val tableAvroSchema = new Schema.Parser().parse(tableSchema.avroSchemaStr)
-      private val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
-      private val requiredFieldPosition =
-        requiredSchema.structTypeSchema
-          .map(f => tableAvroSchema.getField(f.name).pos()).toList
-      private val serializer = sparkAdapter.createAvroSerializer(tableSchema.structTypeSchema, tableAvroSchema,
-        resolveAvroSchemaNullability(tableAvroSchema))
-      private val requiredDeserializer = sparkAdapter.createAvroDeserializer(requiredAvroSchema, requiredSchema.structTypeSchema)
-      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
-      private val unsafeProjection = UnsafeProjection.create(requiredSchema.structTypeSchema)
-      private var logScanner = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config)
-      private val logRecords = logScanner.getRecords
-      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
-      private val keyToSkip = mutable.Set.empty[String]
-      private val recordKeyPosition = tableSchema.structTypeSchema.fieldIndex(recordKeyField)
-
-      private var recordToLoad: InternalRow = _
-
-      @scala.annotation.tailrec
-      override def hasNext: Boolean = {
-        if (baseFileIterator.hasNext) {
-          val curRow = baseFileIterator.next()
-          val curKey = curRow.getString(recordKeyPosition)
-          if (logRecords.containsKey(curKey)) {
-            // duplicate key found, merging
-            keyToSkip.add(curKey)
-            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
-            if (!mergedAvroRecord.isPresent) {
-              // deleted
-              this.hasNext
-            } else {
-              // load merged record as InternalRow with required schema
-              val requiredAvroRecord = AvroConversionUtils.buildAvroRecordBySchema(mergedAvroRecord.get(), requiredAvroSchema,
-                requiredFieldPosition, recordBuilder)
-              val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-              recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-              true
-            }
-          } else {
-            // No merge needed, load current row with required schema
-            recordToLoad = unsafeProjection(createInternalRowWithSchema(curRow, requiredSchema.structTypeSchema, requiredFieldPosition))
-            true
-          }
+  /**
+   * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], provides an iterator over all of the records stored in
+   * a) Base file and all of the b) Delta Log files combining records with the same primary key from both of these
+   * streams
+   */
+  private class RecordMergingFileIterator(split: HoodieMergeOnReadFileSplit,
+                                          baseFileIterator: Iterator[InternalRow],
+                                          baseFileReaderSchema: HoodieTableSchema,
+                                          config: Configuration)
+    extends LogFileIterator(split, config) {
+
+    // NOTE: Record-merging iterator supports 2 modes of operation merging records bearing either
+    //        - Full table's schema
+    //        - Projected schema
+    //       As such, no particular schema could be assumed, and therefore we rely on the caller
+    //       to correspondingly set the scheme of the expected output of base-file reader
+    private val baseFileReaderAvroSchema = new Schema.Parser().parse(baseFileReaderSchema.avroSchemaStr)
+    private val requiredSchemaFieldOrdinals: List[Int] = collectFieldOrdinals(requiredAvroSchema, baseFileReaderAvroSchema)
+
+    private val serializer = sparkAdapter.createAvroSerializer(baseFileReaderSchema.structTypeSchema,
+      baseFileReaderAvroSchema, resolveAvroSchemaNullability(baseFileReaderAvroSchema))
+
+    private val recordKeyOrdinal = baseFileReaderSchema.structTypeSchema.fieldIndex(tableState.recordKeyField)
+
+    override def hasNext: Boolean = {
+      if (baseFileIterator.hasNext) {
+        val curRowRecord = baseFileIterator.next()
+        val curKey = curRowRecord.getString(recordKeyOrdinal)
+        val updatedRecordOpt = removeLogRecord(curKey)
+        if (updatedRecordOpt.isEmpty) {
+          // No merge needed, load current row with required projected schema
+          recordToLoad = unsafeProjection(projectRowUnsafe(curRowRecord, requiredSchema.structTypeSchema, requiredSchemaFieldOrdinals))
+          true
         } else {
-          if (logRecordsKeyIterator.hasNext) {
-            val curKey = logRecordsKeyIterator.next()
-            if (keyToSkip.contains(curKey)) {
-              this.hasNext
-            } else {
-              val insertAvroRecord = logRecords.get(curKey).getData.getInsertValue(tableAvroSchema, payloadProps)
-              if (!insertAvroRecord.isPresent) {
-                // stand alone delete record, skipping
-                this.hasNext
-              } else {
-                val requiredAvroRecord = AvroConversionUtils
-                  .buildAvroRecordBySchema(
-                    insertAvroRecord.get(),
-                    requiredAvroSchema,
-                    requiredFieldPosition,
-                    recordBuilder
-                  )
-                val rowOpt = requiredDeserializer.deserialize(requiredAvroRecord)
-                recordToLoad = unsafeProjection(rowOpt.get.asInstanceOf[InternalRow])
-                true
-              }
-            }
+          val mergedAvroRecordOpt = merge(serialize(curRowRecord), updatedRecordOpt.get)
+          if (mergedAvroRecordOpt.isEmpty) {
+            // Record has been deleted, skipping
+            this.hasNext
           } else {
-            false
+            // NOTE: In occurrence of a merge we can't know the schema of the record being returned, b/c
+            //       record from the Delta Log will bear (full) Table schema, while record from the Base file
+            //       might already be read in projected one (as an optimization).
+            //       As such we can't use more performant [[projectAvroUnsafe]], and instead have to fallback
+            //       to [[projectAvro]]
+            val projectedAvroRecord = projectAvro(mergedAvroRecordOpt.get, requiredAvroSchema, recordBuilder)
+            recordToLoad = unsafeProjection(deserialize(projectedAvroRecord))
+            true
           }
         }
+      } else {
+        super[LogFileIterator].hasNext
       }
+    }
 
-      override def next(): InternalRow = recordToLoad
-
-      override def close(): Unit = {
-        if (logScanner != null) {
-          try {
-            logScanner.close()
-          } finally {
-            logScanner = null
-          }
-        }
-      }
+    private def serialize(curRowRecord: InternalRow): GenericRecord =
+      serializer.serialize(curRowRecord).asInstanceOf[GenericRecord]
 
-      private def mergeRowWithLog(curRow: InternalRow, curKey: String) : org.apache.hudi.common.util.Option[IndexedRecord] = {
-        val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
-        val mergedRec = logRecords.get(curKey).getData
-          .combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema, payloadProps)
-        if (mergedRec.isPresent && mergedRec.get().getSchema != tableAvroSchema) {
-          org.apache.hudi.common.util.Option.of(HoodieAvroUtils.rewriteRecord(mergedRec.get().asInstanceOf[GenericRecord], tableAvroSchema).asInstanceOf[IndexedRecord])
-        } else {
-          mergedRec
-        }
-      }
+    private def merge(curAvroRecord: GenericRecord, newRecord: HoodieRecord[_ <: HoodieRecordPayload[_]]): Option[IndexedRecord] = {
+      // NOTE: We have to pass in Avro Schema used to read from Delta Log file since we invoke combining API
+      //       on the record from the Delta Log
+      toScalaOption(newRecord.getData.combineAndGetUpdateValue(curAvroRecord, logFileReaderAvroSchema, payloadProps))
     }
+  }
 }
 
 private object HoodieMergeOnReadRDD {
+
   val CONFIG_INSTANTIATION_LOCK = new Object()
 
-  def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
-    val fs = FSUtils.getFs(split.tablePath, config)
-    val logFiles = split.logFiles.get
+  def scanLog(logFiles: List[HoodieLogFile],
+              partitionPath: Path,
+              logSchema: Schema,
+              tableState: HoodieTableState,
+              maxCompactionMemoryInBytes: Long,
+              hadoopConf: Configuration): HoodieMergedLogRecordScanner = {

Review comment:
       why still have `maxCompactionMemoryInBytes` as an arg, which can be retrieved from `hadoopConf` ? having >5 args makes the API harder to use

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala
##########
@@ -20,64 +20,15 @@ package org.apache.hudi
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.QueryExecutionException
-import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException}
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
 
 case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit
 
-/**
- * TODO eval if we actually need it
- */
 class HoodieFileScanRDD(@transient private val sparkSession: SparkSession,
                         readFunction: PartitionedFile => Iterator[InternalRow],
                         @transient fileSplits: Seq[HoodieBaseFileSplit])
-  extends HoodieUnsafeRDD(sparkSession.sparkContext) {
-
-  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {

Review comment:
       this was spark code copied over? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org