You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/12 06:49:49 UTC

[hudi] branch master updated: [HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 994c561488 [HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)
994c561488 is described below

commit 994c56148859a8b7d6b6b5c068a7715167e9bb70
Author: HunterXHunter <13...@qq.com>
AuthorDate: Tue Jul 12 14:49:44 2022 +0800

    [HUDI-4298] When reading the mor table with QUERY_TYPE_SNAPSHOT,Unabl… (#5937)
    
    * [HUDI-4298] Add test case for reading mor table
    
    Signed-off-by: LinMingQiang <13...@qq.com>
---
 .../hudi/common/testutils/HoodieTestUtils.java     |   4 +
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |   6 +-
 .../org/apache/hudi/sink/TestWriteCopyOnWrite.java |   6 +-
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java |  57 ++++++++
 .../sink/compact/ITTestHoodieFlinkCompactor.java   |   6 +-
 .../org/apache/hudi/sink/utils/TestWriteBase.java  |  19 +--
 .../test/java/org/apache/hudi/utils/TestData.java  | 146 +++++++++++++++------
 7 files changed, 179 insertions(+), 65 deletions(-)

diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 4e507436b9..6a2bffd34d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -66,6 +66,10 @@ public class HoodieTestUtils {
     return init(getDefaultHadoopConf(), basePath, tableType);
   }
 
+  public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, Properties properties) throws IOException {
+    return init(getDefaultHadoopConf(), basePath, tableType, properties);
+  }
+
   public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType, String bootstrapBasePath, boolean bootstrapIndexEnable) throws IOException {
     Properties props = new Properties();
     props.setProperty(HoodieTableConfig.BOOTSTRAP_BASE_PATH.key(), bootstrapBasePath);
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 9ed0dfb807..3d96c1cafa 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -263,7 +263,7 @@ public class ITTestDataStreamWrite extends TestLogger {
       client.getJobExecutionResult().get();
     }
 
-    TestData.checkWrittenFullData(tempFile, expected);
+    TestData.checkWrittenDataCOW(tempFile, expected);
   }
 
   private void testWriteToHoodieWithCluster(
@@ -327,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger {
     // wait for the streaming job to finish
     client.getJobExecutionResult().get();
 
-    TestData.checkWrittenFullData(tempFile, expected);
+    TestData.checkWrittenDataCOW(tempFile, expected);
   }
 
   public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
@@ -449,7 +449,7 @@ public class ITTestDataStreamWrite extends TestLogger {
     builder.sink(dataStream, false);
 
     execute(execEnv, true, "Api_Sink_Test");
-    TestData.checkWrittenFullData(tempFile, EXPECTED);
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED);
   }
 
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
index e67d2ab35c..9a6aaf01e6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java
@@ -250,7 +250,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .checkpoint(2)
         .assertNextEvent()
         .checkpointComplete(2)
-        .checkWrittenFullData(EXPECTED5)
+        .checkWrittenDataCOW(EXPECTED5)
         .end();
   }
 
@@ -282,7 +282,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .checkpoint(2)
         .handleEvents(2)
         .checkpointComplete(2)
-        .checkWrittenFullData(EXPECTED5)
+        .checkWrittenDataCOW(EXPECTED5)
         .end();
   }
 
@@ -305,7 +305,7 @@ public class TestWriteCopyOnWrite extends TestWriteBase {
         .checkpoint(2)
         .handleEvents(1)
         .checkpointComplete(2)
-        .checkWrittenFullData(EXPECTED5)
+        .checkWrittenDataCOW(EXPECTED5)
         .end();
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index aa31d859bb..1fe48e6700 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,12 +18,15 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -84,6 +87,60 @@ public class TestWriteMergeOnRead extends TestWriteCopyOnWrite {
     validateIndexLoaded();
   }
 
+  @Test
+  public void testEventTimeAvroPayloadMergeRead() throws Exception {
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.set(FlinkOptions.OPERATION, "upsert");
+    conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, 2);
+    conf.set(FlinkOptions.PRE_COMBINE, true);
+    conf.set(FlinkOptions.PRECOMBINE_FIELD, "ts");
+    conf.set(FlinkOptions.PAYLOAD_CLASS_NAME, EventTimeAvroPayload.class.getName());
+    HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
+    mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1, id2,par1,id2,Stephen,33,2,par1]");
+    TestHarness.instance().preparePipeline(tempFile, conf)
+            .consume(TestData.DATA_SET_INSERT)
+            .emptyEventBuffer()
+            .checkpoint(1)
+            .assertNextEvent()
+            .checkpointComplete(1)
+            .checkWrittenData(EXPECTED1, 4)
+            .consume(TestData.DATA_SET_DISORDER_INSERT)
+            .emptyEventBuffer()
+            .checkpoint(2)
+            .assertNextEvent()
+            .checkpointComplete(2)
+            .checkWrittenData(mergedExpected, 4)
+            .consume(TestData.DATA_SET_SINGLE_INSERT)
+            .emptyEventBuffer()
+            .checkpoint(3)
+            .assertNextEvent()
+            .checkpointComplete(3)
+            .checkWrittenData(mergedExpected, 4)
+            .end();
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {1, 2})
+  public void testOnlyBaseFileOrOnlyLogFileRead(int compactionDeltaCommits) throws Exception {
+    conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, true);
+    conf.set(FlinkOptions.PATH, tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    conf.set(FlinkOptions.OPERATION, "upsert");
+    conf.set(FlinkOptions.CHANGELOG_ENABLED, false);
+    conf.set(FlinkOptions.COMPACTION_DELTA_COMMITS, compactionDeltaCommits);
+    TestHarness.instance().preparePipeline(tempFile, conf)
+            .consume(TestData.DATA_SET_INSERT)
+            .emptyEventBuffer()
+            .checkpoint(1)
+            .assertNextEvent()
+            .checkpointComplete(1)
+            .checkWrittenData(EXPECTED1, 4)
+            .end();
+  }
+
   @Override
   public void testInsertClustering() {
     // insert clustering is only valid for cow table.
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 341a157e86..7e3b055add 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -161,7 +161,7 @@ public class ITTestHoodieFlinkCompactor {
 
     env.execute("flink_hudi_compaction");
     writeClient.close();
-    TestData.checkWrittenFullData(tempFile, EXPECTED1);
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
   }
 
   @ParameterizedTest
@@ -202,7 +202,7 @@ public class ITTestHoodieFlinkCompactor {
 
     asyncCompactionService.shutDown();
 
-    TestData.checkWrittenFullData(tempFile, EXPECTED2);
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED2);
   }
 
   @ParameterizedTest
@@ -281,7 +281,7 @@ public class ITTestHoodieFlinkCompactor {
 
     env.execute("flink_hudi_compaction");
     writeClient.close();
-    TestData.checkWrittenFullData(tempFile, EXPECTED3);
+    TestData.checkWrittenDataCOW(tempFile, EXPECTED3);
   }
 
   private String scheduleCompactionPlan(HoodieTableMetaClient metaClient, HoodieFlinkWriteClient<?> writeClient) {
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
index b752258219..b6ae0767d6 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java
@@ -22,20 +22,14 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
-import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestData;
 import org.apache.hudi.utils.TestUtils;
 
-import org.apache.avro.Schema;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.table.data.RowData;
@@ -338,9 +332,7 @@ public class TestWriteBase {
     public TestHarness checkWrittenData(
         Map<String, String> expected,
         int partitions) throws Exception {
-      if (OptionsResolver.isCowTable(conf)
-          || conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)
-          || OptionsResolver.isAppendMode(conf)) {
+      if (OptionsResolver.isCowTable(conf)) {
         TestData.checkWrittenData(this.baseFile, expected, partitions);
       } else {
         checkWrittenDataMor(baseFile, expected, partitions);
@@ -349,15 +341,12 @@ public class TestWriteBase {
     }
 
     private void checkWrittenDataMor(File baseFile, Map<String, String> expected, int partitions) throws Exception {
-      HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, HadoopConfigurations.getHadoopConf(conf));
-      Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
-      String latestInstant = lastCompleteInstant();
       FileSystem fs = FSUtils.getFs(basePath, new org.apache.hadoop.conf.Configuration());
-      TestData.checkWrittenDataMOR(fs, latestInstant, baseFile, expected, partitions, schema);
+      TestData.checkWrittenDataMOR(fs, baseFile, expected, partitions);
     }
 
-    public TestHarness checkWrittenFullData(Map<String, List<String>> expected) throws IOException {
-      TestData.checkWrittenFullData(this.baseFile, expected);
+    public TestHarness checkWrittenDataCOW(Map<String, List<String>> expected) throws IOException {
+      TestData.checkWrittenDataCOW(this.baseFile, expected);
       return this;
     }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index d03e1b2eb5..d0cf143318 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -18,13 +18,22 @@
 
 package org.apache.hudi.utils;
 
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
@@ -49,7 +58,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.Strings;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.hadoop.ParquetReader;
 
@@ -60,15 +68,19 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.function.Predicate;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.function.Predicate;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static junit.framework.TestCase.assertEquals;
+import static org.apache.hudi.common.table.HoodieTableConfig.HOODIE_PROPERTIES_FILE;
+import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -278,6 +290,17 @@ public class TestData {
       insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
           TimestampData.fromEpochMillis(1), StringData.fromString("par1")));
 
+  public static List<RowData> DATA_SET_DISORDER_INSERT = Arrays.asList(
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(3), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 22,
+          TimestampData.fromEpochMillis(4), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(2), StringData.fromString("par1")),
+      insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23,
+          TimestampData.fromEpochMillis(1), StringData.fromString("par1"))
+  );
+
   public static List<RowData> DATA_SET_DISORDER_UPDATE_DELETE = Arrays.asList(
       // DISORDER UPDATE
       updateAfterRow(StringData.fromString("id1"), StringData.fromString("Danny"), 21,
@@ -598,14 +621,14 @@ public class TestData {
    * @param basePath The file base to check, should be a directory
    * @param expected The expected results mapping, the key should be the partition path
    */
-  public static void checkWrittenFullData(
+  public static void checkWrittenDataCOW(
       File basePath,
       Map<String, List<String>> expected) throws IOException {
 
     // 1. init flink table
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath.getAbsolutePath());
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath.getAbsolutePath()).build();
-    HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
+    HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
 
     // 2. check each partition data
     expected.forEach((partition, partitionDataSet) -> {
@@ -638,49 +661,90 @@ public class TestData {
    *
    * <p>Note: Replace it with the Flink reader when it is supported.
    *
-   * @param fs            The file system
-   * @param latestInstant The latest committed instant of current table
-   * @param baseFile      The file base to check, should be a directory
-   * @param expected      The expected results mapping, the key should be the partition path
-   * @param partitions    The expected partition number
-   * @param schema        The read schema
+   * @param fs         The file system
+   * @param baseFile   The file base to check, should be a directory
+   * @param expected   The expected results mapping, the key should be the partition path
+   * @param partitions The expected partition number
    */
   public static void checkWrittenDataMOR(
       FileSystem fs,
-      String latestInstant,
       File baseFile,
       Map<String, String> expected,
-      int partitions,
-      Schema schema) {
+      int partitions) throws Exception {
     assert baseFile.isDirectory() : "Base path should be a directory";
-    FileFilter partitionFilter = file -> !file.getName().startsWith(".");
-    File[] partitionDirs = baseFile.listFiles(partitionFilter);
+    String basePath = baseFile.getAbsolutePath();
+    File hoodiePropertiesFile = new File(baseFile + "/" + METAFOLDER_NAME + "/" + HOODIE_PROPERTIES_FILE);
+    assert hoodiePropertiesFile.exists();
+    // 1. init flink table
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+            .fromFile(hoodiePropertiesFile)
+            .withPath(basePath).build();
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ, config.getProps());
+    HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient);
+    Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+
+    String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants()
+        .lastInstant().map(HoodieInstant::getTimestamp).orElse(null);
+    assertNotNull(latestInstant, "No completed commit under table path" + basePath);
+
+    File[] partitionDirs = baseFile.listFiles(file -> !file.getName().startsWith(".") && file.isDirectory());
     assertNotNull(partitionDirs);
-    assertThat(partitionDirs.length, is(partitions));
+    assertThat("The partitions number should be: " + partitions, partitionDirs.length, is(partitions));
+
+    // 2. check each partition data
+    final int[] requiredPos = IntStream.range(0, schema.getFields().size()).toArray();
     for (File partitionDir : partitionDirs) {
-      File[] dataFiles = partitionDir.listFiles(file ->
-          file.getName().contains(".log.") && !file.getName().startsWith(".."));
-      assertNotNull(dataFiles);
-      List<String> logPaths = Arrays.stream(dataFiles)
-          .sorted((f1, f2) -> HoodieLogFile.getLogFileComparator()
-              .compare(new HoodieLogFile(f1.getPath()), new HoodieLogFile(f2.getPath())))
-          .map(File::getAbsolutePath).collect(Collectors.toList());
-      HoodieMergedLogRecordScanner scanner = getScanner(fs, baseFile.getPath(), logPaths, schema, latestInstant);
-      List<String> readBuffer = scanner.getRecords().values().stream()
-          .map(hoodieRecord -> {
-            try {
-              // in case it is a delete
-              GenericRecord record = (GenericRecord) hoodieRecord.getData()
-                  .getInsertValue(schema, new Properties())
-                  .orElse(null);
-              return record == null ? (String) null : filterOutVariables(record);
-            } catch (IOException e) {
-              throw new RuntimeException(e);
+      List<String> readBuffer = new ArrayList<>();
+      List<FileSlice> fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList());
+      for (FileSlice fileSlice : fileSlices) {
+        HoodieMergedLogRecordScanner scanner = null;
+        List<String> logPaths = fileSlice.getLogFiles()
+            .sorted(HoodieLogFile.getLogFileComparator())
+            .map(logFile -> logFile.getPath().toString())
+            .collect(Collectors.toList());
+        if (logPaths.size() > 0) {
+          scanner = getScanner(fs, basePath, logPaths, schema, latestInstant);
+        }
+        String baseFilePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+        Set<String> keyToSkip = new HashSet<>();
+        if (baseFilePath != null) {
+          // read the base file first
+          GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
+          ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path(baseFilePath)).build();
+          GenericRecord currentRecord = reader.read();
+          while (currentRecord != null) {
+            String curKey = currentRecord.get(HOODIE_RECORD_KEY_COL_POS).toString();
+            if (scanner != null && scanner.getRecords().containsKey(curKey)) {
+              keyToSkip.add(curKey);
+              // merge row with log.
+              final HoodieAvroRecord<?> record = (HoodieAvroRecord<?>) scanner.getRecords().get(curKey);
+              Option<IndexedRecord> combineResult = record.getData().combineAndGetUpdateValue(currentRecord, schema, config.getProps());
+              if (combineResult.isPresent()) {
+                GenericRecord avroRecord = buildAvroRecordBySchema(combineResult.get(), schema, requiredPos, recordBuilder);
+                readBuffer.add(filterOutVariables(avroRecord));
+              }
+            } else {
+              readBuffer.add(filterOutVariables(currentRecord));
             }
-          })
-          .filter(Objects::nonNull)
-          .sorted(Comparator.naturalOrder())
-          .collect(Collectors.toList());
+            currentRecord = reader.read();
+          }
+        }
+        // read the remaining log data.
+        if (scanner != null) {
+          for (String curKey : scanner.getRecords().keySet()) {
+            if (!keyToSkip.contains(curKey)) {
+              Option<GenericRecord> record = (Option<GenericRecord>) scanner.getRecords()
+                      .get(curKey).getData()
+                      .getInsertValue(schema, config.getProps());
+              if (record.isPresent()) {
+                readBuffer.add(filterOutVariables(record.get()));
+              }
+            }
+          }
+        }
+      }
+      // Ensure that to write and read sequences are consistent.
+      readBuffer.sort(String::compareTo);
       assertThat(readBuffer.toString(), is(expected.get(partitionDir.getName())));
     }
   }
@@ -722,7 +786,7 @@ public class TestData {
     fields.add(genericRecord.get("age").toString());
     fields.add(genericRecord.get("ts").toString());
     fields.add(genericRecord.get("partition").toString());
-    return Strings.join(fields, ",");
+    return String.join(",",fields);
   }
 
   public static BinaryRowData insertRow(Object... fields) {