You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2021/02/17 11:36:38 UTC

[hudi] branch master updated: [HUDI-1109] Support Spark Structured Streaming read from Hudi table (#2485)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3797207  [HUDI-1109] Support Spark Structured Streaming read from Hudi table (#2485)
3797207 is described below

commit 37972071ff4d1912ea58834061abf95fc499739d
Author: pengzhiwei <pe...@icloud.com>
AuthorDate: Wed Feb 17 19:36:29 2021 +0800

    [HUDI-1109] Support Spark Structured Streaming read from Hudi table (#2485)
---
 ...parkRowDeserializer.java => SparkRowSerDe.java} |   4 +-
 hudi-spark-datasource/hudi-spark/pom.xml           |  19 ++
 .../main/scala/org/apache/hudi/DefaultSource.scala |  37 +++-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  12 +-
 .../sql/hudi/streaming/HoodieSourceOffset.scala    |  69 ++++++++
 .../sql/hudi/streaming/HoodieStreamSource.scala    | 197 +++++++++++++++++++++
 .../hudi/functional/TestStreamingSource.scala      | 154 ++++++++++++++++
 ...2RowDeserializer.scala => Spark2RowSerDe.scala} |   9 +-
 ...3RowDeserializer.scala => Spark3RowSerDe.scala} |  10 +-
 pom.xml                                            |  21 +++
 10 files changed, 517 insertions(+), 15 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowSerDe.java
similarity index 91%
rename from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java
rename to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowSerDe.java
index 66b8b78..dce2d2f 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowDeserializer.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkRowSerDe.java
@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.InternalRow;
 
 import java.io.Serializable;
 
-public interface SparkRowDeserializer extends Serializable {
+public interface SparkRowSerDe extends Serializable {
   Row deserializeRow(InternalRow internalRow);
+
+  InternalRow serializeRow(Row row);
 }
diff --git a/hudi-spark-datasource/hudi-spark/pom.xml b/hudi-spark-datasource/hudi-spark/pom.xml
index 4f56c7e9..d2c9485 100644
--- a/hudi-spark-datasource/hudi-spark/pom.xml
+++ b/hudi-spark-datasource/hudi-spark/pom.xml
@@ -266,6 +266,25 @@
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
     <!-- Spark (Packages) -->
     <dependency>
       <groupId>org.apache.spark</groupId>
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 27f922e..ef9bf8c 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -22,12 +22,13 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY}
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
@@ -44,6 +45,7 @@ class DefaultSource extends RelationProvider
   with CreatableRelationProvider
   with DataSourceRegister
   with StreamSinkProvider
+  with StreamSourceProvider
   with Serializable {
 
   SparkSession.getActiveSession.foreach { spark =>
@@ -191,4 +193,35 @@ class DefaultSource extends RelationProvider
         .resolveRelation()
     }
   }
+
+  override def sourceSchema(sqlContext: SQLContext,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): (String, StructType) = {
+    val path = parameters.get("path")
+    if (path.isEmpty || path.get == null) {
+      throw new HoodieException(s"'path'  must be specified.")
+    }
+    val metaClient = new HoodieTableMetaClient(
+      sqlContext.sparkSession.sessionState.newHadoopConf(), path.get)
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val sqlSchema =
+      try {
+        val avroSchema = schemaResolver.getTableAvroSchema
+        AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
+      } catch {
+        case _: Exception =>
+          require(schema.isDefined, "Fail to resolve source schema")
+          schema.get
+      }
+    (shortName(), sqlSchema)
+  }
+
+  override def createSource(sqlContext: SQLContext,
+                            metadataPath: String,
+                            schema: Option[StructType],
+                            providerName: String,
+                            parameters: Map[String, String]): Source = {
+    new HoodieStreamSource(sqlContext, metadataPath, schema, parameters)
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 02880f2..bd55930 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -21,7 +21,7 @@ package org.apache.hudi
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.client.utils.SparkRowDeserializer
+import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.rdd.RDD
@@ -99,7 +99,7 @@ object HoodieSparkUtils {
     // Use the Avro schema to derive the StructType which has the correct nullability information
     val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
-    val deserializer = HoodieSparkUtils.createDeserializer(encoder)
+    val deserializer = HoodieSparkUtils.createRowSerDe(encoder)
     df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
       .mapPartitions { records =>
         if (records.isEmpty) Iterator.empty
@@ -110,12 +110,12 @@ object HoodieSparkUtils {
       }
   }
 
-  def createDeserializer(encoder: ExpressionEncoder[Row]): SparkRowDeserializer = {
-    // TODO remove Spark2RowDeserializer if Spark 2.x support is dropped
+  def createRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = {
+    // TODO remove Spark2RowSerDe if Spark 2.x support is dropped
     if (SPARK_VERSION.startsWith("2.")) {
-      new Spark2RowDeserializer(encoder)
+      new Spark2RowSerDe(encoder)
     } else {
-      new Spark3RowDeserializer(encoder)
+      new Spark3RowSerDe(encoder)
     }
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
new file mode 100644
index 0000000..03e651e
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieSourceOffset.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+
+case class HoodieSourceOffset(commitTime: String) extends Offset {
+
+  override def json(): String = {
+    HoodieSourceOffset.toJson(this)
+  }
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case HoodieSourceOffset(otherCommitTime) =>
+        otherCommitTime == commitTime
+      case _=> false
+    }
+  }
+
+  override def hashCode(): Int = {
+    commitTime.hashCode
+  }
+}
+
+
+object HoodieSourceOffset {
+  val mapper = new ObjectMapper with ScalaObjectMapper
+  mapper.setSerializationInclusion(Include.NON_ABSENT)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+  mapper.registerModule(DefaultScalaModule)
+
+  def toJson(offset: HoodieSourceOffset): String = {
+    mapper.writeValueAsString(offset)
+  }
+
+  def fromJson(json: String): HoodieSourceOffset = {
+    mapper.readValue[HoodieSourceOffset](json)
+  }
+
+  def apply(offset: Offset): HoodieSourceOffset = {
+    offset match {
+      case SerializedOffset(json) => fromJson(json)
+      case o: HoodieSourceOffset => o
+    }
+  }
+
+  val INIT_OFFSET = HoodieSourceOffset(HoodieTimeline.INIT_INSTANT_TS)
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
new file mode 100644
index 0000000..c17598a
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.streaming
+
+import java.io.{BufferedWriter, InputStream, OutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils, IncrementalRelation, MergeOnReadIncrementalRelation}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.util.{FileIOUtils, TablePathUtils}
+import org.apache.spark.sql.hudi.streaming.HoodieStreamSource.VERSION
+import org.apache.spark.sql.hudi.streaming.HoodieSourceOffset.INIT_OFFSET
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, Offset, Source}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+/**
+  * The Struct Stream Source for Hudi to consume the data by streaming job.
+  * @param sqlContext
+  * @param metadataPath
+  * @param schemaOption
+  * @param parameters
+  */
+class HoodieStreamSource(
+    sqlContext: SQLContext,
+    metadataPath: String,
+    schemaOption: Option[StructType],
+    parameters: Map[String, String])
+  extends Source with Logging with Serializable {
+
+  @transient private val hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+  private lazy val tablePath: Path = {
+    val path = new Path(parameters.getOrElse("path", "Missing 'path' option"))
+    val fs = path.getFileSystem(hadoopConf)
+    TablePathUtils.getTablePath(fs, path).get()
+  }
+  private lazy val metaClient = new HoodieTableMetaClient(hadoopConf, tablePath.toString)
+  private lazy val tableType = metaClient.getTableType
+
+  @transient private var lastOffset: HoodieSourceOffset = _
+  @transient private lazy val initialOffsets = {
+    val metadataLog =
+      new HDFSMetadataLog[HoodieSourceOffset](sqlContext.sparkSession, metadataPath) {
+        override def serialize(metadata: HoodieSourceOffset, out: OutputStream): Unit = {
+          val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
+          writer.write("v" + VERSION + "\n")
+          writer.write(metadata.json)
+          writer.flush()
+        }
+
+        /**
+          * Deserialize the init offset from the metadata file.
+          * The format in the metadata file is like this:
+          * ----------------------------------------------
+          * v1         -- The version info in the first line
+          * offsetJson -- The json string of HoodieSourceOffset in the rest of the file
+          * -----------------------------------------------
+          * @param in
+          * @return
+          */
+        override def deserialize(in: InputStream): HoodieSourceOffset = {
+          val content = FileIOUtils.readAsUTFString(in)
+          // Get version from the first line
+          val firstLineEnd = content.indexOf("\n")
+          if (firstLineEnd > 0) {
+            val version = getVersion(content.substring(0, firstLineEnd))
+            if (version > VERSION) {
+              throw new IllegalStateException(s"UnSupportVersion: max support version is: $VERSION" +
+                s" current version is: $version")
+            }
+            // Get offset from the rest line in the file
+            HoodieSourceOffset.fromJson(content.substring(firstLineEnd + 1))
+          } else {
+            throw new IllegalStateException(s"Bad metadata format, failed to find the version line.")
+          }
+        }
+      }
+    metadataLog.get(0).getOrElse {
+      metadataLog.add(0, INIT_OFFSET)
+      INIT_OFFSET
+    }
+  }
+
+  private def getVersion(versionLine: String): Int = {
+    if (versionLine.startsWith("v")) {
+      versionLine.substring(1).toInt
+    } else {
+      throw new IllegalStateException(s"Illegal version line: $versionLine " +
+        s"in the streaming metadata path")
+    }
+  }
+
+  override def schema: StructType = {
+    schemaOption.getOrElse {
+      val schemaUtil = new TableSchemaResolver(metaClient)
+      SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+        .dataType.asInstanceOf[StructType]
+    }
+  }
+
+  /**
+    * Get the latest offset from the hoodie table.
+    * @return
+    */
+  override def getOffset: Option[Offset] = {
+    metaClient.reloadActiveTimeline()
+    val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    if (!activeInstants.empty()) {
+      val currentLatestCommitTime = activeInstants.lastInstant().get().getTimestamp
+      if (lastOffset == null || currentLatestCommitTime > lastOffset.commitTime) {
+        lastOffset = HoodieSourceOffset(currentLatestCommitTime)
+      }
+    } else { // if there are no active commits, use the init offset
+      lastOffset = initialOffsets
+    }
+    Some(lastOffset)
+  }
+
+  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+    initialOffsets
+
+    val startOffset = start.map(HoodieSourceOffset(_))
+      .getOrElse(initialOffsets)
+    val endOffset = HoodieSourceOffset(end)
+
+    if (startOffset == endOffset) {
+      sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true)
+    } else {
+      // Consume the data between (startCommitTime, endCommitTime]
+      val incParams = parameters ++ Map(
+        DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY -> startCommitTime(startOffset),
+        DataSourceReadOptions.END_INSTANTTIME_OPT_KEY -> endOffset.commitTime
+      )
+
+      val rdd = tableType match {
+        case HoodieTableType.COPY_ON_WRITE =>
+          val serDe = HoodieSparkUtils.createRowSerDe(RowEncoder(schema))
+          new IncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan()
+            .map(serDe.serializeRow)
+        case HoodieTableType.MERGE_ON_READ =>
+          val requiredColumns = schema.fields.map(_.name)
+          new MergeOnReadIncrementalRelation(sqlContext, incParams, schema, metaClient)
+            .buildScan(requiredColumns, Array.empty[Filter])
+            .asInstanceOf[RDD[InternalRow]]
+        case _ => throw new IllegalArgumentException(s"UnSupport tableType: $tableType")
+      }
+      sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+    }
+  }
+
+  private def startCommitTime(startOffset: HoodieSourceOffset): String = {
+    startOffset match {
+      case INIT_OFFSET => startOffset.commitTime
+      case HoodieSourceOffset(commitTime) =>
+        val time = HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime
+        // As we consume the data between (start, end], start is not included,
+        // so we +1s to the start commit time here.
+        HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(time + 1000))
+      case _=> throw new IllegalStateException("UnKnow offset type.")
+    }
+  }
+
+  override def stop(): Unit = {
+
+  }
+}
+
+object HoodieStreamSource {
+  val VERSION = 1
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
new file mode 100644
index 0000000..a98152a
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM, INSERT_PARALLELISM, TABLE_NAME, UPSERT_PARALLELISM}
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.{Row, SaveMode}
+
+class TestStreamingSource extends StreamTest {
+
+  import testImplicits._
+  private val commonOptions = Map(
+    RECORDKEY_FIELD_OPT_KEY -> "id",
+    PRECOMBINE_FIELD_OPT_KEY -> "ts",
+    INSERT_PARALLELISM -> "4",
+    UPSERT_PARALLELISM -> "4",
+    DELETE_PARALLELISM -> "4"
+  )
+  private val columns = Seq("id", "name", "price", "ts")
+
+  override protected def sparkConf = {
+    super.sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+  }
+
+  test("test cow stream source") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
+      HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath,
+        COPY_ON_WRITE, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      val df = spark.readStream
+        .format("org.apache.hudi")
+        .load(tablePath)
+        .select("id", "name", "price", "ts")
+
+      testStream(df)(
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
+        StopStream,
+
+        addDataToQuery(tablePath, Seq(("1", "a1", "12", "000"))),
+        StartStream(),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(Seq(Row("1", "a1", "12", "000")), lastOnly = true, isSorted = false),
+
+        addDataToQuery(tablePath,
+          Seq(("2", "a2", "12", "000"),
+              ("3", "a3", "12", "000"),
+              ("4", "a4", "12", "000"))),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(
+          Seq(Row("2", "a2", "12", "000"),
+             Row("3", "a3", "12", "000"),
+             Row("4", "a4", "12", "000")),
+          lastOnly = true, isSorted = false),
+          StopStream,
+
+        addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
+        addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
+        addDataToQuery(tablePath, Seq(("5", "a5", "15", "000"))),
+        StartStream(),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(
+          Seq(Row("6", "a6", "12", "000"),
+            Row("5", "a5", "15", "000")),
+          lastOnly = true, isSorted = false)
+      )
+    }
+  }
+
+  test("test mor stream source") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
+      HoodieTableMetaClient.initTableType(spark.sessionState.newHadoopConf(), tablePath,
+        MERGE_ON_READ, getTableName(tablePath), DataSourceWriteOptions.DEFAULT_PAYLOAD_OPT_VAL)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      val df = spark.readStream
+        .format("org.apache.hudi")
+        .load(tablePath)
+        .select("id", "name", "price", "ts")
+
+      testStream(df)(
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
+        StopStream,
+
+        addDataToQuery(tablePath,
+          Seq(("2", "a2", "12", "000"),
+            ("3", "a3", "12", "000"),
+            ("2", "a2", "10", "001"))),
+        StartStream(),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(
+          Seq(Row("3", "a3", "12", "000"),
+            Row("2", "a2", "10", "001")),
+          lastOnly = true, isSorted = false),
+        StopStream,
+
+        addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
+        addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
+        StartStream(),
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(
+          Seq(Row("5", "a5", "12", "000"),
+            Row("6", "a6", "12", "000")),
+          lastOnly = true, isSorted = false)
+      )
+    }
+  }
+
+  private def addData(inputPath: String, rows: Seq[(String, String, String, String)]): Unit = {
+    rows.toDF(columns: _*)
+      .write
+      .format("org.apache.hudi")
+      .options(commonOptions)
+      .option(TABLE_NAME, getTableName(inputPath))
+      .mode(SaveMode.Append)
+      .save(inputPath)
+  }
+
+  private def addDataToQuery(inputPath: String,
+                             rows: Seq[(String, String, String, String)]): AssertOnQuery = {
+    AssertOnQuery { _=>
+      addData(inputPath, rows)
+      true
+    }
+  }
+
+  private def getTableName(inputPath: String): String = {
+    val start = inputPath.lastIndexOf('/')
+    inputPath.substring(start + 1)
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowSerDe.scala
similarity index 83%
rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala
rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowSerDe.scala
index 84fe4c3..ca04470 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/hudi/Spark2RowSerDe.scala
@@ -17,14 +17,17 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.client.utils.SparkRowDeserializer
-
+import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
-class Spark2RowDeserializer(val encoder: ExpressionEncoder[Row]) extends SparkRowDeserializer {
+class Spark2RowSerDe(val encoder: ExpressionEncoder[Row]) extends SparkRowSerDe {
   def deserializeRow(internalRow: InternalRow): Row = {
     encoder.fromRow(internalRow)
   }
+
+  override def serializeRow(row: Row): InternalRow = {
+    encoder.toRow(row)
+  }
 }
diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala
similarity index 79%
rename from hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala
rename to hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala
index a060655..e3d809f 100644
--- a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowDeserializer.scala
+++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/hudi/Spark3RowSerDe.scala
@@ -17,17 +17,21 @@
 
 package org.apache.hudi
 
-import org.apache.hudi.client.utils.SparkRowDeserializer
-
+import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 
-class Spark3RowDeserializer(val encoder: ExpressionEncoder[Row]) extends SparkRowDeserializer {
+class Spark3RowSerDe(val encoder: ExpressionEncoder[Row]) extends SparkRowSerDe {
 
   private val deserializer: ExpressionEncoder.Deserializer[Row] = encoder.createDeserializer()
+  private val serializer: ExpressionEncoder.Serializer[Row] = encoder.createSerializer()
 
   def deserializeRow(internalRow: InternalRow): Row = {
     deserializer.apply(internalRow)
   }
+
+  override def serializeRow(row: Row): InternalRow = {
+    serializer.apply(row)
+  }
 }
diff --git a/pom.xml b/pom.xml
index 91780da..cd66d65 100644
--- a/pom.xml
+++ b/pom.xml
@@ -527,6 +527,27 @@
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        <classifier>tests</classifier>
+        <version>${spark.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-core_${scala.binary.version}</artifactId>
+        <classifier>tests</classifier>
+        <version>${spark.version}</version>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+        <classifier>tests</classifier>
+        <version>${spark.version}</version>
+        <scope>test</scope>
+      </dependency>
 
       <!-- Spark (Packages) -->
       <dependency>