You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/21 07:07:52 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1848: [HUDI-69] Support Spark Datasource for MOR table - RDD approach

vinothchandar commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r457864770



##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
##########
@@ -69,6 +71,17 @@ public static Schema readSchema(Configuration conf, Path filePath) {
     }
   }
 
+  /**
+   * get the max compaction memory in bytes from JobConf.
+   */
+  public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {

Review comment:
       ah ok. its just relocated 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
##########
@@ -147,12 +146,4 @@ public Schema getWriterSchema() {
   public Schema getHiveSchema() {
     return hiveSchema;
   }
-
-  public long getMaxCompactionMemoryInBytes() {

Review comment:
       why remove this? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
##########
@@ -40,6 +40,8 @@
 import java.io.IOException;
 import java.util.Map;
 
+import static org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;

Review comment:
       seems good to avoid the static import here and have the reader realize the class its coming from ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition

Review comment:
       lets name the classes `HoodieMergeOn...` not `Hudi..` to be consistent with rest of the code. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
       nit: space after Logging? `Logging {` ? 

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/config/HadoopSerializableConfiguration.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.config;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class HadoopSerializableConfiguration implements Serializable {

Review comment:
       hows this different from the class we already have in `hudi-common` : `SerializableConfiguration` ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -110,6 +112,10 @@ object DataSourceReadOptions {
    */
   val INCR_PATH_GLOB_OPT_KEY = "hoodie.datasource.read.incr.path.glob"
   val DEFAULT_INCR_PATH_GLOB_OPT_VAL = ""
+
+
+  val REALTIME_SKIP_MERGE_KEY = REALTIME_SKIP_MERGE_PROP

Review comment:
       should we just make a new datasource level config for this. and translate. mixing raw InputFormat level configs here, feels a bit problematic in terms of long term maitenance> 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues(

Review comment:
       from the last PR, it seems the biggest change is calling `ParquetFileFormat()`. and the wrapping as opposed to inheriting the code. Would life be lot simpler if we still defined our own FileFormat and then wrapped `ParquetFileFormat` and the code you have to read logs as `Iterator<InternalRow>` ? Just a thought.  https://github.com/apache/spark/blob/8c7d6f9733751503f80d5a1b2463904dfefd6843/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala#L105
   
   What you are doing is probably valid and makes it consistent with how @umehrot2 is also approaching it. if they are equivalent, then I am fine with it. 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -123,4 +120,25 @@ class DefaultSource extends RelationProvider
   }
 
   override def shortName(): String = "hudi"
+
+  private def getReadOptimizedView(sqlContext: SQLContext,

Review comment:
       rename: getBaseFileOnlyView() 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)

Review comment:
       is the remove needed. this map is often spillable.. we should just make sure the remove does not incur additional I/O or soemethng 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {

Review comment:
       lets not leak parquet in the naming within this class. we can keep it generic as base vs log files

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)
+            mergeRowWithLog(curRow)
+          } else {
+            curRow
+          }
+        } else {
+          val curKey = hudiLogRecordsIterator.next()

Review comment:
       same comment here about double checking if this is actually ok 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,

Review comment:
       would n't this read all the fields out? should we not pass `userSchema` here? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{

Review comment:
       can we file a JIRA for this. and is it possible to target this for 0.6.0 as well? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {

Review comment:
       why not `containsKey`. `keySet()` may create a copy? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext

Review comment:
       this seems to indicate that we will keep scanning the remaining entries in the log and hand them out if `dataFileIterator` runs out. We need to be careful about how it interplays with split generation. specifically, only works if the each base file has only 1 split.. 
   ```
   HudiMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
            metaClient.getBasePath, maxCompactionMemoryInBytes, skipMerge)
   ```
   
   here `partitionedFile` has to be a single file and not an input Split. otherwise we will face an issue that the log entry belongs to a different split and the ultimate query will have duplicates. 
   

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],

Review comment:
       rename `baseFileReadFunction` 

##########
File path: hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
##########
@@ -67,7 +68,7 @@ class TestDataSource {
     // Insert Operation
     val records = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("000", 100)).toList
     val inputDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
-    inputDF.write.format("hudi")
+    inputDF.write.format("org.apache.hudi")

Review comment:
       why change this? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,
+      filters = Seq.empty,

Review comment:
       if we did `PrunedFilteredScan` we can also pass in teh filters? in any case, can we pass in the the options? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])

Review comment:
       hudi vs hoodie 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                    logPaths: Option[List[String]],
+                                    latestCommit: String,
+                                    tablePath: String,
+                                    maxCompactionMemoryInBytes: Long,
+                                    skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+                        val optParams: Map[String, String],
+                        val userSchema: StructType,
+                        val globPaths: Seq[Path],
+                        val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with TableScan with Logging{
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+  // use schema from latest metadata, if not present, read schema from the data file
+  private val latestSchema = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+    AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+  }
+
+  private val skipMerge = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new JobConf(conf))
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = latestSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(): RDD[Row] = {
+    val parquetReaderFunction = new ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = latestSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = latestSchema,
+      filters = Seq.empty,
+      options = Map.empty,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+    val rdd = new HudiMergeOnReadRDD(sqlContext.sparkContext,
+      sqlContext.sparkSession.sessionState.newHadoopConf(),
+      parquetReaderFunction, latestSchema, fileIndex)
+    rdd.asInstanceOf[RDD[Row]]
+  }
+
+  def buildFileIndex(): List[HudiMergeOnReadFileSplit] = {
+    val inMemoryFileIndex = HudiSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
+    val fileStatuses = inMemoryFileIndex.allFiles()
+    if (fileStatuses.isEmpty) {
+      throw new HoodieException("No files found for reading in user provided path.")
+    }
+
+    val fsView = new HoodieTableFileSystemView(metaClient,
+      metaClient.getActiveTimeline.getCommitsTimeline
+        .filterCompletedInstants, fileStatuses.toArray)
+    val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+    val latestCommit = fsView.getLastInstant.get().getTimestamp
+    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
+    val FileSplits = fileGroup.map(kv => {

Review comment:
       rename variable  using camel case? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")

Review comment:
       print the `mergeParquetPartition` split variable in the error message? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HudiMergeOnReadRDD.scala
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner, LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.{HadoopSerializableConfiguration, HoodieRealtimeConfig}
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HudiMergeOnReadPartition(index: Int, split: HudiMergeOnReadFileSplit) extends Partition
+
+class HudiMergeOnReadRDD(sc: SparkContext,
+                         broadcastedConf: Broadcast[HadoopSerializableConfiguration],
+                         dataReadFunction: PartitionedFile => Iterator[Any],
+                         dataSchema: StructType,
+                         hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit])
+  extends RDD[InternalRow](sc, Nil) {
+
+  // Broadcast the hadoop Configuration to executors.
+  def this(sc: SparkContext,
+           config: Configuration,
+           dataReadFunction: PartitionedFile => Iterator[Any],
+           dataSchema: StructType,
+           hudiRealtimeFileSplits: List[HudiMergeOnReadFileSplit]) = {
+    this(
+      sc,
+      sc.broadcast(new HadoopSerializableConfiguration(config))
+      .asInstanceOf[Broadcast[HadoopSerializableConfiguration]],
+      dataReadFunction,
+      dataSchema,
+      hudiRealtimeFileSplits)
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HudiMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, dataReadFunction)
+      case unMergeSplit if unMergeSplit.skipMerge =>
+        unMergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case mergeSplit if !mergeSplit.skipMerge =>
+        mergeFileIterator(mergeParquetPartition.split, dataReadFunction)
+      case _ => throw new HoodieException("Unable to select an Iterator to read the Hudi MOR File Split")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    hudiRealtimeFileSplits.zipWithIndex.map(file => HudiMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig(): Configuration = {
+    broadcastedConf.value.config
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def unMergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                  readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var hudiLogRecordsIterator = hudiLogRecords.keySet().iterator().asScala
+
+      override def hasNext: Boolean = {
+        dataFileIterator.hasNext || hudiLogRecordsIterator.hasNext
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          dataFileIterator.next()
+        } else {
+          val curAvrokey = hudiLogRecordsIterator.next()
+          val curAvroRecord = hudiLogRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+          converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+        }
+      }
+    }
+
+  private def mergeFileIterator(split: HudiMergeOnReadFileSplit,
+                                readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val dataFileIterator = read(split.dataFile, readFileFunction)
+      private val logSchema = getLogAvroSchema(split)
+      private val sparkTypes = SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+      private val converter = new AvroDeserializer(logSchema, sparkTypes)
+      private val hudiLogRecords = scanLog(split, logSchema).getRecords
+      private var parquetFinished = false
+      private var logRecordToRead = hudiLogRecords.keySet()
+      private var hudiLogRecordsIterator: Iterator[String] = _
+
+      override def hasNext: Boolean = {
+        if (dataFileIterator.hasNext) {
+          true
+        } else {
+          if (!parquetFinished) {
+            parquetFinished = true
+            hudiLogRecordsIterator = logRecordToRead.iterator().asScala
+          }
+          hudiLogRecordsIterator.hasNext
+        }
+      }
+
+      override def next(): InternalRow = {
+        if (dataFileIterator.hasNext) {
+          val curRow = dataFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (hudiLogRecords.keySet().contains(curKey)) {
+            logRecordToRead.remove(curKey)
+            mergeRowWithLog(curRow)
+          } else {
+            curRow
+          }
+        } else {
+          val curKey = hudiLogRecordsIterator.next()
+          getAvroRecord(curKey)
+        }
+      }
+
+      private def getAvroRecord(curKey: String): InternalRow = {
+        val curAvroRecord = hudiLogRecords.get(curKey).getData.getInsertValue(logSchema).get()

Review comment:
       this implicitly assumes `OvewriteWithLatestPayload` ? can we just convert the parquet row as well to Avro and then perform the merge actually calling the right API. `HoodieRecordPayload#combineAndGetUpdateValue()` ?  This is a correctness issue we need to resolve in the PR.. 
   ideally, adding a test case as well to go along with this would be good 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org