You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/07 07:28:29 UTC

[hudi] branch master updated: [HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f74a84  [HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)
4f74a84 is described below

commit 4f74a84607d46249e9bb6e1397246f8dc076b390
Author: Gary Li <ya...@gmail.com>
AuthorDate: Fri Aug 7 00:28:14 2020 -0700

    [HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)
    
    - This PR implements Spark Datasource for MOR table in the RDD approach.
    - Implemented SnapshotRelation
    - Implemented HudiMergeOnReadRDD
    - Implemented separate Iterator to handle merge and unmerge record reader.
    - Added TestMORDataSource to verify this feature.
    - Clean up test file name, add tests for mixed query type tests
     - We can now revert the change made in DefaultSource
    
    Co-authored-by: Vinoth Chandar <vc...@confluent.io>
---
 .../hudi/testutils/HoodieClientTestBase.java       |   2 +-
 .../hudi/common/table/log/LogReaderUtils.java      |   8 +-
 .../common/testutils/HoodieTestDataGenerator.java  |  15 +
 .../realtime/AbstractRealtimeRecordReader.java     |   9 -
 .../realtime/RealtimeCompactedRecordReader.java    |   2 +-
 .../realtime/RealtimeUnmergedRecordReader.java     |  11 +-
 .../utils/HoodieRealtimeInputFormatUtils.java      |  18 +-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |  13 +
 .../org/apache/hudi/AvroConversionUtils.scala      |  14 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   8 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |  56 +--
 ...ptyRelation.scala => HoodieEmptyRelation.scala} |   4 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     | 274 +++++++++++++++
 ...HudiSparkUtils.scala => HoodieSparkUtils.scala} |   4 +-
 .../org/apache/hudi/IncrementalRelation.scala      |   7 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  | 151 ++++++++
 .../java/org/apache/hudi/client/TestBootstrap.java |  10 +-
 ...SparkUtils.scala => TestHoodieSparkUtils.scala} |  15 +-
 .../apache/hudi/functional/TestCOWDataSource.scala | 197 +++++++++++
 .../apache/hudi/functional/TestDataSource.scala    | 337 ------------------
 .../apache/hudi/functional/TestMORDataSource.scala | 391 +++++++++++++++++++++
 .../hudi/functional/TestStructuredStreaming.scala  | 180 ++++++++++
 22 files changed, 1317 insertions(+), 409 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 667181c..203cc54 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -71,7 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class HoodieClientTestBase extends HoodieClientTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class);
+  protected static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class);
 
   @BeforeEach
   public void setUp() throws Exception {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 0662e68..ffc4b85 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -29,9 +29,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 import java.util.List;
@@ -62,15 +62,15 @@ public class LogReaderUtils {
     return writerSchema;
   }
 
-  public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, JobConf jobConf)
+  public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, Configuration config)
       throws IOException {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath);
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config, basePath);
     List<String> deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s)))
         .sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString())
         .collect(Collectors.toList());
     if (deltaPaths.size() > 0) {
       for (String logPath : deltaPaths) {
-        FileSystem fs = FSUtils.getFs(logPath, jobConf);
+        FileSystem fs = FSUtils.getFs(logPath, config);
         Schema schemaFromLogFile =
             readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath));
         if (schemaFromLogFile != null) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 6b6074b..90b15d0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -634,6 +634,10 @@ public class HoodieTestDataGenerator {
     return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
   }
 
+  public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String instantTime, Integer n, String schemaStr) {
+    return generateUniqueUpdatesStream(instantTime, n, schemaStr).collect(Collectors.toList());
+  }
+
   /**
    * Generates deduped delete of keys previously inserted, randomly distributed across the keys above.
    *
@@ -745,6 +749,17 @@ public class HoodieTestDataGenerator {
     return result.stream();
   }
 
+  /**
+   * Generates deduped delete records previously inserted, randomly distributed across the keys above.
+   *
+   * @param instantTime Commit Timestamp
+   * @param n          Number of unique records
+   * @return List of hoodie records for delete
+   */
+  public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime, Integer n) {
+    return generateUniqueDeleteRecordStream(instantTime, n).collect(Collectors.toList());
+  }
+
   public boolean deleteExistingKeyIfPresent(HoodieKey key) {
     Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
     Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 65c416c..050b91a 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.LogReaderUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.InputSplitUtils;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 
 import org.apache.avro.Schema;
@@ -148,12 +147,4 @@ public abstract class AbstractRealtimeRecordReader {
   public Schema getHiveSchema() {
     return hiveSchema;
   }
-
-  public long getMaxCompactionMemoryInBytes() {
-    // jobConf.getMemoryForMapTask() returns in MB
-    return (long) Math
-        .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
-            HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
-            * jobConf.getMemoryForMapTask() * 1024 * 1024L);
-  }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 78925c3..042199f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -69,7 +69,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         split.getDeltaLogPaths(),
         usesCustomPayload ? getWriterSchema() : getReaderSchema(),
         split.getMaxCommitTime(),
-        getMaxCompactionMemoryInBytes(),
+        HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
         Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
         false,
         jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index c06bff2..76de84b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -63,7 +63,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
    * clients to consume.
    *
    * @param split File split
-   * @param job Job Configuration
+   * @param jobConf Job Configuration
    * @param realReader Parquet Reader
    */
   public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job,
@@ -72,14 +72,15 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
     this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
     // Iterator for consuming records from parquet file
     this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader);
-    this.executor = new BoundedInMemoryExecutor<>(getMaxCompactionMemoryInBytes(), getParallelProducers(),
+    this.executor = new BoundedInMemoryExecutor<>(
+        HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(),
         Option.empty(), x -> x, new DefaultSizeEstimator<>());
     // Consumer of this record reader
     this.iterator = this.executor.getQueue().iterator();
-    this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
+    this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), this.jobConf),
         split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
-        Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
-        false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+        Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+        false, this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
           // convert Hoodie log record to Hadoop AvroWritable and buffer
           GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
           ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index cc46d96..346d7a0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.utils;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -34,7 +35,6 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
 import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
@@ -119,15 +119,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
   }
 
   // Return parquet file with a list of log files in the same file group.
-  public static Map<String, List<String>> groupLogsByBaseFile(Configuration conf, Stream<FileStatus> fileStatuses) {
-    Map<Path, List<FileStatus>> partitionsToParquetSplits =
-        fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent()));
+  public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuration conf, List<HoodieBaseFile> fileStatuses) {
+    Map<Path, List<HoodieBaseFile>> partitionsToParquetSplits =
+        fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent()));
     // TODO(vc): Should we handle also non-hoodie splits here?
     Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
 
     // for all unique split parents, obtain all delta files based on delta commit timeline,
     // grouped on file id
-    Map<String, List<String>> resultMap = new HashMap<>();
+    Map<HoodieBaseFile, List<String>> resultMap = new HashMap<>();
     partitionsToParquetSplits.keySet().forEach(partitionPath -> {
       // for each partition path obtain the data & log file groupings, then map back to inputsplits
       HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
@@ -144,15 +144,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
             .orElse(Stream.empty());
 
         // subgroup splits again by file id & match with log files.
-        Map<String, List<FileStatus>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
-            .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName())));
+        Map<String, List<HoodieBaseFile>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+            .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName())));
         latestFileSlices.forEach(fileSlice -> {
-          List<FileStatus> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+          List<HoodieBaseFile> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
           dataFileSplits.forEach(split -> {
             try {
               List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
                   .map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
-              resultMap.put(split.getPath().toString(), logFilePaths);
+              resultMap.put(split, logFilePaths);
             } catch (Exception e) {
               throw new HoodieException("Error creating hoodie real time split ", e);
             }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index e925c18..871f722 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.avro.LogicalTypes;
@@ -43,6 +44,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -70,6 +72,17 @@ public class HoodieRealtimeRecordReaderUtils {
   }
 
   /**
+   * get the max compaction memory in bytes from JobConf.
+   */
+  public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {
+    // jobConf.getMemoryForMapTask() returns in MB
+    return (long) Math
+        .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
+            HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
+            * jobConf.getMemoryForMapTask() * 1024 * 1024L);
+  }
+
+  /**
    * Prints a JSON representation of the ArrayWritable for easier debuggability.
    */
   public static String arrayWritableToString(ArrayWritable writable) {
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index bdb8955..e6d6c55 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi
 
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
 import org.apache.hudi.common.model.HoodieKey
 import org.apache.avro.Schema
 import org.apache.spark.rdd.RDD
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
+import scala.collection.JavaConverters._
 
 object AvroConversionUtils {
 
@@ -78,4 +79,15 @@ object AvroConversionUtils {
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
     SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
   }
+
+  def buildAvroRecordBySchema(record: IndexedRecord,
+                              requiredSchema: Schema,
+                              requiredPos: List[Int],
+                              recordBuilder: GenericRecordBuilder): GenericRecord = {
+    val requiredFields = requiredSchema.getFields.asScala
+    assert(requiredFields.length == requiredPos.length)
+    val positionIterator = requiredPos.iterator
+    requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
+    recordBuilder.build()
+  }
 }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 4d94463..917bfed 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -51,6 +51,14 @@ object DataSourceReadOptions {
   val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
   val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
 
+  /**
+   * For Snapshot query on merge on read table. Use this key to define the payload class.
+   */
+  val REALTIME_MERGE_OPT_KEY = "hoodie.datasource.merge.type"
+  val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
+  val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
+  val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL
+
   @Deprecated
   val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
   @Deprecated
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index e26c1c8..10be305 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,8 +18,9 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
@@ -60,26 +61,20 @@ class DefaultSource extends RelationProvider
       throw new HoodieException("'path' must be specified.")
     }
 
+    val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
+    val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
+    val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
     if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-      // this is just effectively RO view only, where `path` can contain a mix of
-      // non-hoodie/hoodie path files. set the path filter up
-      sqlContext.sparkContext.hadoopConfiguration.setClass(
-        "mapreduce.input.pathFilter.class",
-        classOf[HoodieROTablePathFilter],
-        classOf[org.apache.hadoop.fs.PathFilter])
-
-      log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
-      log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
-        "Please query the Hive table registered using Spark SQL.")
-      // simply return as a regular parquet relation
-      DataSource.apply(
-        sparkSession = sqlContext.sparkSession,
-        userSpecifiedSchema = Option(schema),
-        className = "parquet",
-        options = parameters)
-        .resolveRelation()
+      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
+        new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
+      } else {
+        getBaseFileOnlyView(sqlContext, parameters, schema)
+      }
+    } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
+      getBaseFileOnlyView(sqlContext, parameters, schema)
     } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      new IncrementalRelation(sqlContext, path.get, optParams, schema)
+      new IncrementalRelation(sqlContext, tablePath, optParams, schema)
     } else {
       throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
     }
@@ -107,7 +102,7 @@ class DefaultSource extends RelationProvider
                               df: DataFrame): BaseRelation = {
     val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
     HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
-    new HudiEmptyRelation(sqlContext, df.schema)
+    new HoodieEmptyRelation(sqlContext, df.schema)
   }
 
   override def createSink(sqlContext: SQLContext,
@@ -123,4 +118,25 @@ class DefaultSource extends RelationProvider
   }
 
   override def shortName(): String = "hudi"
+
+  private def getBaseFileOnlyView(sqlContext: SQLContext,
+                                   optParams: Map[String, String],
+                                   schema: StructType): BaseRelation = {
+    log.warn("Loading Base File Only View.")
+    // this is just effectively RO view only, where `path` can contain a mix of
+    // non-hoodie/hoodie path files. set the path filter up
+    sqlContext.sparkContext.hadoopConfiguration.setClass(
+      "mapreduce.input.pathFilter.class",
+      classOf[HoodieROTablePathFilter],
+      classOf[org.apache.hadoop.fs.PathFilter])
+
+    log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
+    // simply return as a regular parquet relation
+    DataSource.apply(
+      sparkSession = sqlContext.sparkSession,
+      userSpecifiedSchema = Option(schema),
+      className = "parquet",
+      options = optParams)
+      .resolveRelation()
+  }
 }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
similarity index 90%
rename from hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala
rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
index 8ddbe46..4642993 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
   * @param sqlContext Spark SQL Context
   * @param userSchema Users data schema
   */
-class HudiEmptyRelation(val sqlContext: SQLContext,
-                        val userSchema: StructType) extends BaseRelation {
+class HoodieEmptyRelation(val sqlContext: SQLContext,
+                          val userSchema: StructType) extends BaseRelation {
 
   override def schema: StructType = userSchema
 }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
new file mode 100644
index 0000000..f272084
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(@transient sc: SparkContext,
+                           @transient config: Configuration,
+                           fullSchemaFileReader: PartitionedFile => Iterator[Any],
+                           requiredSchemaFileReader: PartitionedFile => Iterator[Any],
+                           tableState: HoodieMergeOnReadTableState)
+  extends RDD[InternalRow](sc, Nil) {
+
+  private val confBroadcast = sc.broadcast(new SerializableWritable(config))
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+    mergeParquetPartition.split match {
+      case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+        read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
+      case skipMergeSplit if skipMergeSplit.mergeType
+        .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+        skipMergeFileIterator(
+          skipMergeSplit,
+          read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
+          getConfig
+        )
+      case payloadCombineSplit if payloadCombineSplit.mergeType
+        .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+        payloadCombineFileIterator(
+          payloadCombineSplit,
+          read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
+          getConfig
+        )
+      case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
+        s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
+        s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
+        s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
+        s"spark partition Index: ${mergeParquetPartition.index}" +
+        s"merge type: ${mergeParquetPartition.split.mergeType}")
+    }
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    tableState
+      .hoodieRealtimeFileSplits
+      .zipWithIndex
+      .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
+  }
+
+  private def getConfig: Configuration = {
+    val conf = confBroadcast.value.value
+    HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
+      new Configuration(conf)
+    }
+  }
+
+  private def read(partitionedFile: PartitionedFile,
+                   readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+    val fileIterator = readFileFunction(partitionedFile)
+    val rows = fileIterator.flatMap(_ match {
+      case r: InternalRow => Seq(r)
+      case b: ColumnarBatch => b.rowIterator().asScala
+    })
+    rows
+  }
+
+  private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
+                                  baseFileIterator: Iterator[InternalRow],
+                                  config: Configuration): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+      private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
+      private val requiredFieldPosition =
+        tableState.requiredStructSchema
+          .map(f => tableAvroSchema.getField(f.name).pos()).toList
+      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+      private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
+      private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
+      private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
+      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
+
+      private var recordToLoad: InternalRow = _
+
+      @scala.annotation.tailrec
+      override def hasNext: Boolean = {
+        if (baseFileIterator.hasNext) {
+          recordToLoad = baseFileIterator.next()
+          true
+        } else {
+          if (logRecordsKeyIterator.hasNext) {
+            val curAvrokey = logRecordsKeyIterator.next()
+            val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
+            if (!curAvroRecord.isPresent) {
+              // delete record found, skipping
+              this.hasNext
+            } else {
+              val requiredAvroRecord = AvroConversionUtils
+                .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
+              recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+              true
+            }
+          } else {
+            false
+          }
+        }
+      }
+
+      override def next(): InternalRow = {
+        recordToLoad
+      }
+    }
+
+  private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
+                                baseFileIterator: Iterator[InternalRow],
+                                config: Configuration): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+      private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
+      private val requiredFieldPosition =
+        tableState.requiredStructSchema
+          .map(f => tableAvroSchema.getField(f.name).pos()).toList
+      private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
+      private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
+      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+      private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
+      private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
+      private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
+      private val keyToSkip = mutable.Set.empty[String]
+
+      private var recordToLoad: InternalRow = _
+
+      @scala.annotation.tailrec
+      override def hasNext: Boolean = {
+        if (baseFileIterator.hasNext) {
+          val curRow = baseFileIterator.next()
+          val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+          if (logRecords.containsKey(curKey)) {
+            // duplicate key found, merging
+            keyToSkip.add(curKey)
+            val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
+            if (!mergedAvroRecord.isPresent) {
+              // deleted
+              this.hasNext
+            } else {
+              // load merged record as InternalRow with required schema
+              val requiredAvroRecord = AvroConversionUtils
+                .buildAvroRecordBySchema(
+                  mergedAvroRecord.get(),
+                  requiredAvroSchema,
+                  requiredFieldPosition,
+                  recordBuilder
+                )
+              recordToLoad = unsafeProjection(requiredDeserializer
+                .deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+              true
+            }
+          } else {
+            // No merge needed, load current row with required schema
+            recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
+            true
+          }
+        } else {
+          if (logRecordsKeyIterator.hasNext) {
+            val curKey = logRecordsKeyIterator.next()
+            if (keyToSkip.contains(curKey)) {
+              this.hasNext
+            } else {
+              val insertAvroRecord =
+                logRecords.get(curKey).getData.getInsertValue(tableAvroSchema)
+              if (!insertAvroRecord.isPresent) {
+                // stand alone delete record, skipping
+                this.hasNext
+              } else {
+                val requiredAvroRecord = AvroConversionUtils
+                  .buildAvroRecordBySchema(
+                    insertAvroRecord.get(),
+                    requiredAvroSchema,
+                    requiredFieldPosition,
+                    recordBuilder
+                  )
+                recordToLoad = unsafeProjection(requiredDeserializer
+                  .deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+                true
+              }
+            }
+          } else {
+            false
+          }
+        }
+      }
+
+      override def next(): InternalRow = recordToLoad
+
+      private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
+        val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
+        val posIterator = requiredFieldPosition.iterator
+        var curIndex = 0
+        tableState.requiredStructSchema.foreach(
+          f => {
+            val curPos = posIterator.next()
+            val curField = row.get(curPos, f.dataType)
+            rowToReturn.update(curIndex, curField)
+            curIndex = curIndex + 1
+          }
+        )
+        rowToReturn
+      }
+
+      private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
+        val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
+        logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
+      }
+    }
+}
+
+private object HoodieMergeOnReadRDD {
+  val CONFIG_INSTANTIATION_LOCK = new Object()
+
+  def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
+    val fs = FSUtils.getFs(split.tablePath, config)
+    new HoodieMergedLogRecordScanner(
+      fs,
+      split.tablePath,
+      split.logPaths.get.asJava,
+      logSchema,
+      split.latestCommit,
+      split.maxCompactionMemoryInBytes,
+      Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+        HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
+      false,
+      config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+        HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
+      config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+        HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+  }
+}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
similarity index 96%
rename from hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 861de14..26babd8 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import scala.collection.JavaConverters._
 
 
-object HudiSparkUtils {
+object HoodieSparkUtils {
 
-  def getHudiMetadataSchema: StructType = {
+  def getMetaSchema: StructType = {
     StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
       StructField(col, StringType, nullable = true)
     }))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 436895b..338a54e 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,14 +17,14 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.fs.GlobPattern
-import org.apache.hadoop.fs.Path
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.fs.GlobPattern
 import org.apache.log4j.LogManager
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
@@ -47,7 +47,8 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
   private val log = LogManager.getLogger(classOf[IncrementalRelation])
 
-  private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
+  private val metaClient =
+    new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
   // MOR tables not supported yet
   if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
     throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
new file mode 100644
index 0000000..c1a6acd
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+                                      logPaths: Option[List[String]],
+                                      latestCommit: String,
+                                      tablePath: String,
+                                      maxCompactionMemoryInBytes: Long,
+                                      mergeType: String)
+
+case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
+                                       requiredStructSchema: StructType,
+                                       tableAvroSchema: String,
+                                       requiredAvroSchema: String,
+                                       hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit])
+
+class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
+                                  val optParams: Map[String, String],
+                                  val userSchema: StructType,
+                                  val globPaths: Seq[Path],
+                                  val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with PrunedFilteredScan with Logging {
+
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+  private val jobConf = new JobConf(conf)
+  // use schema from latest metadata, if not present, read schema from the data file
+  private val schemaUtil = new TableSchemaResolver(metaClient)
+  private val tableAvroSchema = schemaUtil.getTableAvroSchema
+  private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+  private val mergeType = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
+  private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = tableStructSchema
+
+  override def needConversion: Boolean = false
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
+    log.debug(s" buildScan filters = ${filters.mkString(",")}")
+    var requiredStructSchema = StructType(Seq())
+    requiredColumns.foreach(col => {
+      val field = tableStructSchema.find(_.name == col)
+      if (field.isDefined) {
+        requiredStructSchema = requiredStructSchema.add(field.get)
+      }
+    })
+    val requiredAvroSchema = AvroConversionUtils
+      .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
+    val hoodieTableState = HoodieMergeOnReadTableState(
+      tableStructSchema,
+      requiredStructSchema,
+      tableAvroSchema.toString,
+      requiredAvroSchema.toString,
+      fileIndex
+    )
+    val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = Seq(),
+      options = optParams,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+    val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = requiredStructSchema,
+      filters = filters,
+      options = optParams,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+
+    // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration.
+    FileSystem.getLocal(jobConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val rdd = new HoodieMergeOnReadRDD(
+      sqlContext.sparkContext,
+      jobConf,
+      fullSchemaParquetReader,
+      requiredSchemaParquetReader,
+      hoodieTableState
+    )
+    rdd.asInstanceOf[RDD[Row]]
+  }
+
+  def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
+    val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
+    val fileStatuses = inMemoryFileIndex.allFiles()
+    if (fileStatuses.isEmpty) {
+      throw new HoodieException("No files found for reading in user provided path.")
+    }
+
+    val fsView = new HoodieTableFileSystemView(metaClient,
+      metaClient.getActiveTimeline.getCommitsTimeline
+        .filterCompletedInstants, fileStatuses.toArray)
+    val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+    val latestCommit = fsView.getLastInstant.get().getTimestamp
+    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
+    val fileSplits = fileGroup.map(kv => {
+      val baseFile = kv._1
+      val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
+      val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
+      HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
+        metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
+    }).toList
+    fileSplits
+  }
+}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 0aa8ca4..4e1984c 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -130,14 +130,7 @@ public class TestBootstrap extends HoodieClientTestBase {
   public void setUp() throws Exception {
     bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
     initPath();
-    spark = SparkSession.builder()
-        .appName("Bootstrap test")
-        .master("local[2]")
-        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-        .getOrCreate();
-    jsc = new JavaSparkContext(spark.sparkContext());
-    sqlContext = spark.sqlContext();
-    hadoopConf = spark.sparkContext().hadoopConfiguration();
+    initSparkContexts();
     initTestDataGenerator();
     initMetaClient();
     // initialize parquet input format
@@ -146,6 +139,7 @@ public class TestBootstrap extends HoodieClientTestBase {
 
   @AfterEach
   public void tearDown() throws IOException {
+    cleanupSparkContexts();
     cleanupClients();
     cleanupTestDataGenerator();
   }
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
similarity index 89%
rename from hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
rename to hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index 6b1a178..d5da676 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.io.TempDir
 
-class TestHudiSparkUtils {
+class TestHoodieSparkUtils {
 
   @Test
   def testGlobPaths(@TempDir tempDir: File): Unit = {
@@ -48,29 +48,29 @@ class TestHudiSparkUtils {
     files.foreach(file => new File(file.toUri).createNewFile())
 
     var paths = Seq(tempDir.getAbsolutePath + "/*")
-    var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+    var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
     assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/*/*")
-    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+    globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
     assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
-    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+    globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
     assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString),
       globbedPaths.sortWith(_.toString < _.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/folder2/*")
-    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+    globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
     assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString),
       globbedPaths.sortWith(_.toString < _.toString))
 
     paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*")
-    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+    globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
       new Path(paths.head).getFileSystem(new Configuration()))
     assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
   }
@@ -98,8 +98,9 @@ class TestHudiSparkUtils {
     folders.foreach(folder => new File(folder.toUri).mkdir())
     files.foreach(file => new File(file.toUri).createNewFile())
 
-    val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
+    val index = HoodieSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
     val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
     assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
+    spark.stop()
   }
 }
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
new file mode 100644
index 0000000..c1e45c3
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions.col
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Basic tests on the spark datasource for COW table.
+ */
+class TestCOWDataSource extends HoodieClientTestBase {
+  private val log = LogManager.getLogger(getClass)
+  var spark: SparkSession = null
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+  )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @Test def testShortNameStorage() {
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+  }
+
+  @Test def testCopyOnWriteStorage() {
+    // Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+    // Snapshot query
+    val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
+    assertEquals(100, snapshotDF1.count())
+
+    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+    // Upsert Operation
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
+
+    // Snapshot Query
+    val snapshotDF2 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, snapshotDF2.count()) // still 100, since we only updated
+
+    // Read Incremental Query
+    // we have 2 commits, try pulling the first commit (which is not the latest)
+    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0)
+    val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+      .load(basePath)
+    assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
+    var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
+    assertEquals(1, countsPerCommit.length)
+    assertEquals(firstCommit, countsPerCommit(0).get(0))
+
+    // Upsert an empty dataFrame
+    val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
+    val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
+    emptyDF.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // pull the latest commit
+    val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+      .load(basePath)
+
+    assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
+    countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
+    assertEquals(1, countsPerCommit.length)
+    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+    // pull the latest commit within certain partitions
+    val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+      .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
+      .load(basePath)
+    assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
+
+    val timeTravelDF = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+      .load(basePath)
+    assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
+  }
+
+  @Test def testDropInsertDup(): Unit = {
+    val insert1Cnt = 10
+    val insert2DupKeyCnt = 9
+    val insert2NewKeyCnt = 2
+
+    val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
+    val allRecords =  dataGen.generateInserts("001", totalUniqueKeyToGenerate)
+    val inserts1 = allRecords.subList(0, insert1Cnt)
+    val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
+    val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
+
+    val records1 = recordsToStrings(inserts1).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(insert1Cnt, hoodieROViewDF1.count())
+
+    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+      .load(basePath + "/*/*/*/*")
+    assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
+
+    val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+      .load(basePath)
+    assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
+  }
+}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
deleted file mode 100644
index 48582c1..0000000
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional
-
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.TableNotFoundException
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
-import org.apache.log4j.LogManager
-import org.apache.spark.sql._
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.io.TempDir
-import org.junit.jupiter.api.{BeforeEach, Test}
-
-import scala.collection.JavaConversions._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-/**
- * Basic tests on the spark datasource
- */
-class TestDataSource {
-  private val log = LogManager.getLogger(getClass)
-
-  var spark: SparkSession = null
-  var dataGen: HoodieTestDataGenerator = null
-  val commonOpts = Map(
-    "hoodie.insert.shuffle.parallelism" -> "4",
-    "hoodie.upsert.shuffle.parallelism" -> "4",
-    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
-    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
-    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
-    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
-  )
-  var basePath: String = null
-  var fs: FileSystem = null
-
-  @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
-    spark = SparkSession.builder
-      .appName("Hoodie Datasource test")
-      .master("local[2]")
-      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-      .getOrCreate
-    dataGen = new HoodieTestDataGenerator()
-    basePath = tempDir.toAbsolutePath.toString
-    fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
-  }
-
-  @Test def testShortNameStorage() {
-    // Insert Operation
-    val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
-    val inputDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
-    inputDF.write.format("hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-  }
-
-  @Test def testCopyOnWriteStorage() {
-    // Insert Operation
-    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
-    val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-    inputDF1.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-    val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-
-    // Read RO View
-    val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
-      .load(basePath + "/*/*/*/*");
-    assertEquals(100, hoodieROViewDF1.count())
-
-    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
-    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
-    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
-    // Upsert Operation
-    inputDF2.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .mode(SaveMode.Append)
-      .save(basePath)
-
-    val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
-
-    // Read RO View
-    val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
-      .load(basePath + "/*/*/*/*");
-    assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
-
-    // Read Incremental View
-    // we have 2 commits, try pulling the first commit (which is not the latest)
-    val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0);
-    val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
-      .load(basePath);
-    assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
-    var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(firstCommit, countsPerCommit(0).get(0))
-
-    // Upsert an empty dataFrame
-    val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
-    val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
-    emptyDF.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .mode(SaveMode.Append)
-      .save(basePath)
-
-    // pull the latest commit
-    val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
-      .load(basePath);
-
-    assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
-    countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
-    assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
-    // pull the latest commit within certain partitions
-    val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
-      .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
-      .load(basePath);
-    assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
-  }
-
-  @Test def testMergeOnReadStorage() {
-    // Bulk Insert Operation
-    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
-    val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-    inputDF1.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-
-    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-
-    // Read RO View
-    val hoodieROViewDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
-    assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated
-  }
-
-  @Test def testDropInsertDup(): Unit = {
-    val insert1Cnt = 10
-    val insert2DupKeyCnt = 9
-    val insert2NewKeyCnt = 2
-
-    val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
-    val allRecords =  dataGen.generateInserts("001", totalUniqueKeyToGenerate)
-    val inserts1 = allRecords.subList(0, insert1Cnt)
-    val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
-    val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
-
-    val records1 = recordsToStrings(inserts1).toList
-    val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-    inputDF1.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
-      .mode(SaveMode.Overwrite)
-      .save(basePath)
-    val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
-      .load(basePath + "/*/*/*/*")
-    assertEquals(insert1Cnt, hoodieROViewDF1.count())
-
-    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-    val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList
-    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
-    inputDF2.write.format("org.apache.hudi")
-      .options(commonOpts)
-      .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
-      .mode(SaveMode.Append)
-      .save(basePath)
-    val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
-      .load(basePath + "/*/*/*/*")
-    assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
-
-    val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
-      .load(basePath)
-    assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
-  }
-
-  @Test
-  def testStructuredStreaming(): Unit = {
-    fs.delete(new Path(basePath), true)
-    val sourcePath = basePath + "/source"
-    val destPath = basePath + "/dest"
-    fs.mkdirs(new Path(sourcePath))
-
-    // First chunk of data
-    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
-    val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-
-    // Second chunk of data
-    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
-    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
-    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
-    // define the source of streaming
-    val streamingInput =
-      spark.readStream
-      .schema(inputDF1.schema)
-      .json(sourcePath)
-
-    val f1 = Future {
-      println("streaming starting")
-    //'writeStream' can be called only on streaming Dataset/DataFrame
-      streamingInput
-        .writeStream
-        .format("org.apache.hudi")
-        .options(commonOpts)
-        .trigger(new ProcessingTime(100))
-        .option("checkpointLocation", basePath + "/checkpoint")
-        .outputMode(OutputMode.Append)
-        .start(destPath)
-        .awaitTermination(10000)
-      println("streaming ends")
-    }
-
-    val f2 = Future {
-      inputDF1.write.mode(SaveMode.Append).json(sourcePath)
-      // wait for spark streaming to process one microbatch
-      val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5);
-      assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
-      val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
-      // Read RO View
-      val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
-        .load(destPath + "/*/*/*/*")
-      assert(hoodieROViewDF1.count() == 100)
-
-      inputDF2.write.mode(SaveMode.Append).json(sourcePath)
-      // wait for spark streaming to process one microbatch
-      waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5);
-      val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
-      assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
-      // Read RO View
-      val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
-        .load(destPath + "/*/*/*/*")
-      assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
-
-
-      // Read Incremental View
-      // we have 2 commits, try pulling the first commit (which is not the latest)
-      val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
-      val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
-        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
-        .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
-        .load(destPath)
-      assertEquals(100, hoodieIncViewDF1.count())
-      // 100 initial inserts must be pulled
-      var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
-      assertEquals(1, countsPerCommit.length)
-      assertEquals(firstCommit, countsPerCommit(0).get(0))
-
-      // pull the latest commit
-      val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
-        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
-        .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
-        .load(destPath)
-
-      assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
-      countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
-      assertEquals(1, countsPerCommit.length)
-      assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-    }
-    Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
-  }
-
-  @throws[InterruptedException]
-  private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
-                               numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = {
-    val beginTime = System.currentTimeMillis
-    var currTime = beginTime
-    val timeoutMsecs = timeoutSecs * 1000
-    var numInstants = 0
-    var success: Boolean = false
-    while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
-      val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
-      log.info("Timeline :" + timeline.getInstants.toArray)
-      if (timeline.countInstants >= numCommits) {
-        numInstants = timeline.countInstants
-        success = true
-      }
-      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
-    } catch {
-      case te: TableNotFoundException =>
-        log.info("Got table not found exception. Retrying")
-    } finally {
-      Thread.sleep(sleepSecsAfterEachRun * 1000)
-      currTime = System.currentTimeMillis
-    }
-    if (!success) {
-      throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
-    }
-    numInstants
-  }
-}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
new file mode 100644
index 0000000..5938ee5
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Tests on Spark DataSource for MOR table.
+ */
+class TestMORDataSource extends HoodieClientTestBase {
+
+  var spark: SparkSession = null
+  private val log = LogManager.getLogger(classOf[TestMORDataSource])
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+  )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @Test def testMergeOnReadStorage() {
+
+    val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+    // Bulk Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    // Read RO View
+    val hudiRODF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiRODF1.count()) // still 100, since we only updated
+    val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
+    assertEquals(List(insertCommitTime), insertCommitTimes)
+
+    // Upsert operation
+    val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
+    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Read Snapshot query
+    val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
+    assertEquals(List(updateCommitTime), updateCommitTimes)
+  }
+
+  @Test def testCount() {
+    // First Operation:
+    // Producing parquet files to three default partitions.
+    // SNAPSHOT view on MOR table with parquet files only.
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated
+
+    // Second Operation:
+    // Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet.
+    // SNAPSHOT view should read the log files only with the latest commit time.
+    val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
+    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated
+    val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+    val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
+    assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
+    assertTrue(commit2Time > commit1Time)
+    assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
+
+    // Unmerge
+    val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(200, hudiSnapshotSkipMergeDF2.count())
+    assertEquals(100, hudiSnapshotSkipMergeDF2.select("_hoodie_record_key").distinct().count())
+    assertEquals(200, hudiSnapshotSkipMergeDF2.join(hudiSnapshotDF2, Seq("_hoodie_record_key"), "left").count())
+
+    // Test Read Optimized Query on MOR table
+    val hudiRODF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiRODF2.count())
+
+    // Third Operation:
+    // Upsert another update to the default partitions with 50 duplicate records. Produced the second log file for each parquet.
+    // SNAPSHOT view should read the latest log files.
+    val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList
+    val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    // still 100, because we only updated the existing records
+    assertEquals(100, hudiSnapshotDF3.count())
+
+    // 50 from commit2, 50 from commit3
+    assertEquals(hudiSnapshotDF3.select("_hoodie_commit_time").distinct().count(), 2)
+    assertEquals(50, hudiSnapshotDF3.filter(col("_hoodie_commit_time") > commit2Time).count())
+    assertEquals(50,
+      hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", "_hoodie_commit_time"), "inner").count())
+
+    // Fourth Operation:
+    // Insert records to a new partition. Produced a new parquet file.
+    // SNAPSHOT view should read the latest log files from the default partition and parquet from the new partition.
+    val partitionPaths = new Array[String](1)
+    partitionPaths.update(0, "2020/01/10")
+    val newDataGen = new HoodieTestDataGenerator(partitionPaths)
+    val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList
+    val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2))
+    inputDF4.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF4 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    // 200, because we insert 100 records to a new partition
+    assertEquals(200, hudiSnapshotDF4.count())
+    assertEquals(100,
+      hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count())
+
+    // Fifth Operation:
+    // Upsert records to the new partition. Produced a newer version of parquet file.
+    // SNAPSHOT view should read the latest log files from the default partition
+    // and the latest parquet from the new partition.
+    val records5 = recordsToStrings(newDataGen.generateUpdates("005", 100)).toList
+    val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
+    inputDF5.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(200, hudiSnapshotDF5.count())
+  }
+
+  @Test
+  def testPayloadDelete() {
+    // First Operation:
+    // Producing parquet files to three default partitions.
+    // SNAPSHOT view on MOR table with parquet files only.
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+    val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated
+
+    // Second Operation:
+    // Upsert 50 delete records
+    // Snopshot view should only read 50 records
+    val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).toList
+    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(50, hudiSnapshotDF2.count()) // 50 records were deleted
+    assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
+    val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+    val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
+    assertTrue(commit1Time.equals(commit2Time))
+    assertEquals(50, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
+
+    // unmerge query, skip the delete records
+    val hudiSnapshotDF2Unmerge = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF2Unmerge.count())
+
+    // Third Operation:
+    // Upsert 50 delete records to delete the reset
+    // Snopshot view should read 0 record
+    val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).toList
+    val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(0, hudiSnapshotDF3.count()) // 100 records were deleted, 0 record to load
+  }
+
+  @Test
+  def testPrunedFiltered() {
+    // First Operation:
+    // Producing parquet files to three default partitions.
+    // SNAPSHOT view on MOR table with parquet files only.
+    val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF1.count())
+    // select nested columns with order different from the actual schema
+    assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+      hudiSnapshotDF1
+        .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")
+        .orderBy(desc("_hoodie_commit_seqno"))
+        .columns.mkString(","))
+
+    // Second Operation:
+    // Upsert 50 update records
+    // Snopshot view should read 100 records
+    val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50))
+      .toList
+    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+
+    val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+
+    // filter first commit and only read log records
+    assertEquals(50,  hudiSnapshotDF2.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history")
+      .filter(col("_hoodie_commit_time") > commit1Time).count())
+
+    // select nested columns with order different from the actual schema
+    assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+      hudiSnapshotDF2
+      .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")
+      .orderBy(desc("_hoodie_commit_seqno"))
+      .columns.mkString(","))
+
+    // Correctly loading type
+    val sampleRow = hudiSnapshotDF2
+      .select("begin_lat", "current_date", "fare.currency", "tip_history", "nation")
+      .orderBy(desc("_hoodie_commit_time"))
+      .head()
+    assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+    assertEquals(sampleRow.getLong(1), sampleRow.get(1))
+    assertEquals(sampleRow.getString(2), sampleRow.get(2))
+    assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
+    assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+
+    // make sure show() work
+    hudiSnapshotDF1.show(1)
+    hudiSnapshotDF2.show(1)
+  }
+
+  @Test
+  def testVectorizedReader() {
+    spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
+    assertTrue(spark.conf.get("spark.sql.parquet.enableVectorizedReader").toBoolean)
+    // Vectorized Reader will only be triggered with AtomicType schema,
+    // which is not null, UDTs, arrays, structs, and maps.
+    val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
+    val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF1.count())
+
+    val records2 = recordsToStrings(dataGen.generateUniqueUpdatesAsPerSchema("002", 50, schema))
+      .toList
+    val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+    assertEquals(100, hudiSnapshotDF2.count())
+
+    // loading correct type
+    val sampleRow = hudiSnapshotDF2
+      .select("fare", "driver", "_hoodie_is_deleted")
+      .head()
+    assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+    assertEquals(sampleRow.getString(1), sampleRow.get(1))
+    assertEquals(sampleRow.getBoolean(2), sampleRow.get(2))
+
+    // test show()
+    hudiSnapshotDF1.show(1)
+    hudiSnapshotDF2.show(1)
+  }
+}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
new file mode 100644
index 0000000..011573b
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.TableNotFoundException
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+/**
+ * Basic tests on the spark datasource for structured streaming sink
+ */
+class TestStructuredStreaming extends HoodieClientTestBase {
+  private val log = LogManager.getLogger(getClass)
+  var spark: SparkSession = null
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+  )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @Test
+  def testStructuredStreaming(): Unit = {
+    fs.delete(new Path(basePath), true)
+    val sourcePath = basePath + "/source"
+    val destPath = basePath + "/dest"
+    fs.mkdirs(new Path(sourcePath))
+
+    // First chunk of data
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+    // Second chunk of data
+    val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+    // define the source of streaming
+    val streamingInput =
+      spark.readStream
+        .schema(inputDF1.schema)
+        .json(sourcePath)
+
+    val f1 = Future {
+      println("streaming starting")
+      //'writeStream' can be called only on streaming Dataset/DataFrame
+      streamingInput
+        .writeStream
+        .format("org.apache.hudi")
+        .options(commonOpts)
+        .trigger(new ProcessingTime(100))
+        .option("checkpointLocation", basePath + "/checkpoint")
+        .outputMode(OutputMode.Append)
+        .start(destPath)
+        .awaitTermination(10000)
+      println("streaming ends")
+    }
+
+    val f2 = Future {
+      inputDF1.write.mode(SaveMode.Append).json(sourcePath)
+      // wait for spark streaming to process one microbatch
+      val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
+      assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
+      val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
+      // Read RO View
+      val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
+        .load(destPath + "/*/*/*/*")
+      assert(hoodieROViewDF1.count() == 100)
+
+      inputDF2.write.mode(SaveMode.Append).json(sourcePath)
+      // wait for spark streaming to process one microbatch
+      waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
+      val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
+      assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
+      // Read RO View
+      val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+        .load(destPath + "/*/*/*/*")
+      assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
+
+
+      // Read Incremental View
+      // we have 2 commits, try pulling the first commit (which is not the latest)
+      val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
+      val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+        .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+        .load(destPath)
+      assertEquals(100, hoodieIncViewDF1.count())
+      // 100 initial inserts must be pulled
+      var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
+      assertEquals(1, countsPerCommit.length)
+      assertEquals(firstCommit, countsPerCommit(0).get(0))
+
+      // pull the latest commit
+      val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+        .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+        .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+        .load(destPath)
+
+      assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
+      countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
+      assertEquals(1, countsPerCommit.length)
+      assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+    }
+    Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+  }
+
+  @throws[InterruptedException]
+  private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
+                                      numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
+    val beginTime = System.currentTimeMillis
+    var currTime = beginTime
+    val timeoutMsecs = timeoutSecs * 1000
+    var numInstants = 0
+    var success = false
+    while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+      val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
+      log.info("Timeline :" + timeline.getInstants.toArray)
+      if (timeline.countInstants >= numCommits) {
+        numInstants = timeline.countInstants
+        success = true
+      }
+      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+    } catch {
+      case te: TableNotFoundException =>
+        log.info("Got table not found exception. Retrying")
+    } finally {
+      Thread.sleep(sleepSecsAfterEachRun * 1000)
+      currTime = System.currentTimeMillis
+    }
+    if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
+    numInstants
+  }
+}