You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/10/09 02:22:28 UTC

[hudi] branch master updated: [HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2143)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1d91d  [HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2143)
1d1d91d is described below

commit 1d1d91d444b6af2b24b17d94068512a930877a98
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Thu Oct 8 19:21:27 2020 -0700

    [HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable (#2143)
    
    * [HUDI-995] Migrate HoodieTestUtils APIs to HoodieTestTable
    
    Remove APIs in `HoodieTestUtils`
    - listAllDataFilesAndLogFilesInPath
    - listAllLogFilesInPath
    - listAllDataFilesInPath
    - writeRecordsToLogFiles
    - createCleanFiles
    - createPendingCleanFiles
    
    Migrate the callers to use `HoodieTestTable` and `HoodieWriteableTestTable` with new APIs added
    - listAllBaseAndLogFiles
    - listAllLogFiles
    - listAllBaseFiles
    - withLogAppends
    - addClean
    - addInflightClean
    
    Also added related APIs in `FileCreateUtils`
    - createCleanFile
    - createRequestedCleanFile
    - createInflightCleanFile
---
 .../org/apache/hudi/index/TestHoodieIndex.java     |   8 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |  16 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |  10 +-
 .../hudi/io/TestHoodieKeyLocationFetchHandle.java  |   2 +-
 .../hudi/io/TestHoodieTimelineArchiveLog.java      |  65 +++++---
 .../java/org/apache/hudi/table/TestCleaner.java    |   4 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  51 +++---
 .../table/action/compact/CompactionTestBase.java   |  10 +-
 .../table/action/compact/TestHoodieCompactor.java  |   5 +-
 .../rollback/TestMarkerBasedRollbackStrategy.java  |  27 ++--
 .../hudi/testutils/HoodieWriteableTestTable.java   |  63 +++++++-
 .../hudi/common/testutils/FileCreateUtils.java     |  20 ++-
 .../common/testutils/HoodieCommonTestHarness.java  |   4 +-
 .../hudi/common/testutils/HoodieTestTable.java     |  56 ++++++-
 .../hudi/common/testutils/HoodieTestUtils.java     | 171 ++-------------------
 15 files changed, 257 insertions(+), 255 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index 9de36c6..d3b5867 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -281,9 +281,9 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
 
     // We create three parquet file, each having one record. (two different partitions)
     HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(hoodieTable, SCHEMA);
-    String fileId1 = testTable.addCommit("001").withInserts(p1, record1);
-    String fileId2 = testTable.addCommit("002").withInserts(p1, record2);
-    String fileId3 = testTable.addCommit("003").withInserts(p2, record4);
+    String fileId1 = testTable.addCommit("001").getFileIdWithInserts(p1, record1);
+    String fileId2 = testTable.addCommit("002").getFileIdWithInserts(p1, record2);
+    String fileId3 = testTable.addCommit("003").getFileIdWithInserts(p2, record4);
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -375,7 +375,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
             incomingPayloadSamePartition);
 
     // We have some records to be tagged (two different partitions)
-    testTable.addCommit("1000").withInserts(p1, originalRecord);
+    testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord);
 
     // test against incoming record with a different partition
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 2d091a0..39d9b64 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -223,7 +223,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
     filter.add(record3.getRecordKey());
     HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
-    String fileId = testTable.addCommit("000").withInserts(partition, record1, record2);
+    String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2);
     String filename = testTable.getBaseFileNameById(fileId);
 
     // The bloom filter contains 3 records
@@ -310,9 +310,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     }
 
     // We create three parquet file, each having one record. (two different partitions)
-    String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
-    String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
-    String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
+    String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
+    String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
+    String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
 
     // We do the tag again
     taggedRecordRDD = bloomIndex.tagLocation(recordRDD, context, HoodieSparkTable.create(config, context, metaClient));
@@ -380,9 +380,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     }
 
     // We create three parquet file, each having one record. (two different partitions)
-    String fileId1 = testTable.addCommit("001").withInserts("2016/01/31", record1);
-    String fileId2 = testTable.addCommit("002").withInserts("2016/01/31", record2);
-    String fileId3 = testTable.addCommit("003").withInserts("2015/01/31", record4);
+    String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1);
+    String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2);
+    String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4);
 
     // We do the tag again
     metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -433,7 +433,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
         BloomFilterTypeCode.SIMPLE.name());
     filter.add(record2.getRecordKey());
     HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(metaClient, SCHEMA, filter);
-    String fileId = testTable.addCommit("000").withInserts("2016/01/31", record1);
+    String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1);
     assertTrue(filter.mightContain(record1.getRecordKey()));
     assertTrue(filter.mightContain(record2.getRecordKey()));
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 2f68a03..e6fc3be 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -217,10 +217,10 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record5));
 
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
-    String fileId1 = testTable.addCommit("1000").withInserts("2016/04/01", record1);
-    String fileId2 = testTable.addCommit("2000").withInserts("2015/03/12");
-    String fileId3 = testTable.addCommit("3000").withInserts("2015/03/12", record2);
-    String fileId4 = testTable.addCommit("4000").withInserts("2015/03/12", record4);
+    String fileId1 = testTable.addCommit("1000").getFileIdWithInserts("2016/04/01", record1);
+    String fileId2 = testTable.addCommit("2000").getFileIdWithInserts("2015/03/12");
+    String fileId3 = testTable.addCommit("3000").getFileIdWithInserts("2015/03/12", record2);
+    String fileId4 = testTable.addCommit("4000").getFileIdWithInserts("2015/03/12", record4);
 
     // partitions will NOT be respected by this loadInvolvedFiles(...) call
     JavaRDD<HoodieRecord> taggedRecordRDD = index.tagLocation(recordRDD, context, hoodieTable);
@@ -299,7 +299,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
             new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()),
             incomingPayloadSamePartition);
 
-    testTable.addCommit("1000").withInserts(p1, originalRecord);
+    testTable.addCommit("1000").getFileIdWithInserts(p1, originalRecord);
 
     // test against incoming record with a different partition
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 38cd19c..3a7d468 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -130,7 +130,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
 
       for (List<HoodieRecord> recordsPerSlice : recordsForFileSlices) {
         String instantTime = makeNewCommitTime();
-        String fileId = testTable.addCommit(instantTime).withInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0]));
+        String fileId = testTable.addCommit(instantTime).getFileIdWithInserts(entry.getKey(), recordsPerSlice.toArray(new HoodieRecord[0]));
         Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId, instantTime);
         List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new ArrayList<>();
         for (HoodieRecord record : recordsPerSlice) {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 88f755a..f2427cd 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -18,11 +18,13 @@
 
 package org.apache.hudi.io;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -32,15 +34,21 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.HoodieTimelineArchiveLog;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -48,11 +56,14 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -146,13 +157,14 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
 
     assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
 
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "100", wrapperFs.getConf());
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "101", wrapperFs.getConf());
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "102", wrapperFs.getConf());
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "103", wrapperFs.getConf());
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "104", wrapperFs.getConf());
-    HoodieTestUtils.createCleanFiles(metaClient, basePath, "105", wrapperFs.getConf());
-    HoodieTestUtils.createPendingCleanFiles(metaClient, "106", "107");
+    createCleanMetadata("100", false);
+    createCleanMetadata("101", false);
+    createCleanMetadata("102", false);
+    createCleanMetadata("103", false);
+    createCleanMetadata("104", false);
+    createCleanMetadata("105", false);
+    createCleanMetadata("106", true);
+    createCleanMetadata("107", true);
 
     // reload the timeline and get all the commmits before archive
     timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -227,7 +239,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
     int numCommits = 4;
     int commitInstant = 100;
     for (int i = 0; i < numCommits; i++) {
-      createReplaceMetadata(commitInstant);
+      createReplaceMetadata(String.valueOf(commitInstant));
       commitInstant += 100;
     }
 
@@ -478,17 +490,34 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
     assertEquals(expectedCommitMetadata.getOperationType(), WriteOperationType.INSERT.toString());
   }
 
-  private void createReplaceMetadata(int commitInstant) throws Exception {
-    String commitTime = "" + commitInstant;
-    String fileId1 = "file-" + commitInstant + "-1";
-    String fileId2 = "file-" + commitInstant + "-2";
+  private void createReplaceMetadata(String instantTime) throws Exception {
+    String fileId1 = "file-" + instantTime + "-1";
+    String fileId2 = "file-" + instantTime + "-2";
 
     // create replace instant to mark fileId1 as deleted
     HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
     replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
     replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
-    HoodieTestTable testTable =  HoodieTestTable.of(metaClient);
-    testTable.addReplaceCommit(commitTime, replaceMetadata);
-    testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+    HoodieTestTable.of(metaClient)
+        .addReplaceCommit(instantTime, replaceMetadata)
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+
+  private void createCleanMetadata(String instantTime, boolean inflightOnly) throws IOException {
+    HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
+        CleanPlanV2MigrationHandler.VERSION, new HashMap<>());
+    if (inflightOnly) {
+      HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan);
+    } else {
+      HoodieCleanStat cleanStats = new HoodieCleanStat(
+          HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
+          HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
+          Collections.emptyList(),
+          Collections.emptyList(),
+          Collections.emptyList(),
+          instantTime);
+      HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
+      HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata);
+    }
   }
 }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 730e1ef..152a981 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -982,7 +982,7 @@ public class TestCleaner extends HoodieClientTestBase {
     HoodieTestTable testTable = HoodieTestTable.of(metaClient)
         .addRequestedCommit("000")
         .withMarkerFiles("default", 10, IOType.MERGE);
-    final int numTempFilesBefore = testTable.listAllFilesInTempFolder().size();
+    final int numTempFilesBefore = testTable.listAllFilesInTempFolder().length;
     assertEquals(10, numTempFilesBefore, "Some marker files are created.");
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
@@ -992,7 +992,7 @@ public class TestCleaner extends HoodieClientTestBase {
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "000"), Option.empty());
     metaClient.reloadActiveTimeline();
     table.rollback(context, "001", new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000"), true);
-    final int numTempFilesAfter = testTable.listAllFilesInTempFolder().size();
+    final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
     assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index e7db7ad..8b47fa3 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.Transformations;
 import org.apache.hudi.common.util.Option;
@@ -63,6 +64,7 @@ import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExec
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import org.apache.avro.generic.GenericRecord;
@@ -159,7 +161,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       client.compact(compactionCommitTime);
 
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -207,7 +209,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       client.compact(compactionCommitTime);
 
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -377,7 +379,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -418,7 +420,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
       dataFilesToRead = tableView.getLatestBaseFiles();
       assertTrue(dataFilesToRead.findAny().isPresent());
@@ -476,7 +478,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
 
       final String absentCommit = newCommitTime;
@@ -524,7 +526,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView =
           getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
@@ -558,7 +560,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         // Test failed delta commit rollback
         secondClient.rollback(commitTime1);
-        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+        allFiles = listAllBaseFilesInPath(hoodieTable);
         // After rollback, there should be no base file with the failed commit time
         List<String> remainingFiles = Arrays.stream(allFiles).filter(file -> file.getPath().getName()
             .contains(commitTime1)).map(fileStatus -> fileStatus.getPath().toString()).collect(Collectors.toList());
@@ -593,7 +595,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
         // Test successful delta commit rollback
         thirdClient.rollback(commitTime2);
-        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+        allFiles = listAllBaseFilesInPath(hoodieTable);
         // After rollback, there should be no parquet file with the failed commit time
         assertEquals(0, Arrays.stream(allFiles)
             .filter(file -> file.getPath().getName().contains(commitTime2)).count());
@@ -624,16 +626,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
         String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
         thirdClient.compact(compactionInstantTime);
 
-        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+        allFiles = listAllBaseFilesInPath(hoodieTable);
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
         final String compactedCommitTime = metaClient.getActiveTimeline().reload().lastInstant().get().getTimestamp();
-        assertTrue(Arrays.stream(listAllDataFilesInPath(hoodieTable, cfg.getBasePath()))
+        assertTrue(Arrays.stream(listAllBaseFilesInPath(hoodieTable))
                 .anyMatch(file -> compactedCommitTime.equals(new HoodieBaseFile(file).getCommitTime())));
         thirdClient.rollbackInflightCompaction(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactedCommitTime),
                 hoodieTable);
-        allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+        allFiles = listAllBaseFilesInPath(hoodieTable);
         metaClient = HoodieTableMetaClient.reload(metaClient);
         tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -680,7 +682,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -759,7 +761,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>) client.compact(compactionInstantTime);
       client.commitCompaction(compactionInstantTime, ws, Option.empty());
 
-      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      allFiles = listAllBaseFilesInPath(hoodieTable);
       metaClient = HoodieTableMetaClient.reload(metaClient);
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -787,7 +789,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       client.restoreToInstant("000");
 
       metaClient = HoodieTableMetaClient.reload(metaClient);
-      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      allFiles = listAllBaseFilesInPath(hoodieTable);
       tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = tableView.getLatestBaseFiles();
       assertFalse(dataFilesToRead.findAny().isPresent());
@@ -842,7 +844,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
           metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -876,7 +878,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      allFiles = listAllBaseFilesInPath(hoodieTable);
       roView = getHoodieTableFileSystemView(metaClient,
           hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
       dataFilesToRead = roView.getLatestBaseFiles();
@@ -919,15 +921,14 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       updatedRecords = readClient.tagLocation(updatedRecordsRDD).collect();
 
       // Write them to corresponding avro logfiles
-      HoodieTestUtils.writeRecordsToLogFiles(metaClient.getFs(), metaClient.getBasePath(),
-          HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
-
-      // Verify that all data file has one log file
       metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
+      HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
+          .withLogAppends(updatedRecords);
       // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
       ((SyncableFileSystemView) (table.getSliceView())).reset();
 
+      // Verify that all data file has one log file
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> groupedLogFiles =
             table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
@@ -1400,7 +1401,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
       assertFalse(commit.isPresent());
 
-      FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
       BaseFileOnlyView roView =
           getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
       Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1499,7 +1500,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().lastInstant();
     assertFalse(commit.isPresent());
 
-    FileStatus[] allFiles = listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+    FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
     BaseFileOnlyView roView =
         getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
     Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
@@ -1533,7 +1534,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
     assertFalse(commit.isPresent());
     HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context, metaClient);
-    return listAllDataFilesInPath(hoodieTable, cfg.getBasePath());
+    return listAllBaseFilesInPath(hoodieTable);
   }
 
   private FileStatus[] getROSnapshotFiles(String partitionPath)
@@ -1598,8 +1599,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
     assertEquals(expectedCommitsSet, actualCommits);
   }
 
-  private FileStatus[] listAllDataFilesInPath(HoodieTable table, String basePath) throws IOException {
-    return HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), basePath, table.getBaseFileExtension());
+  private FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOException {
+    return HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles(table.getBaseFileExtension());
   }
 
   private FileStatus[] listStatus(JobConf jobConf, boolean realtime) throws IOException {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
index d1d31f8..6992a82 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/CompactionTestBase.java
@@ -35,7 +35,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
@@ -130,7 +130,7 @@ public class CompactionTestBase extends HoodieClientTestBase {
       assertNoWriteErrors(statusList);
       metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
       HoodieTable hoodieTable = getHoodieTable(metaClient, cfg);
-      List<HoodieBaseFile> dataFilesToRead = getCurrentLatestDataFiles(hoodieTable, cfg);
+      List<HoodieBaseFile> dataFilesToRead = getCurrentLatestBaseFiles(hoodieTable);
       assertTrue(dataFilesToRead.stream().findAny().isPresent(),
           "should list the parquet files we wrote in the delta commit");
       validateDeltaCommit(firstInstant, fgIdToCompactionOperation, cfg);
@@ -225,10 +225,10 @@ public class CompactionTestBase extends HoodieClientTestBase {
     return statusList;
   }
 
-  protected List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
-    FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
+  protected List<HoodieBaseFile> getCurrentLatestBaseFiles(HoodieTable table) throws IOException {
+    FileStatus[] allBaseFiles = HoodieTestTable.of(table.getMetaClient()).listAllBaseFiles();
     HoodieTableFileSystemView view =
-        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+        getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allBaseFiles);
     return view.getLatestBaseFiles().collect(Collectors.toList());
   }
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 7655a75..2e6cea7 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -44,6 +44,7 @@ import org.apache.hudi.index.bloom.SparkHoodieBloomIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieWriteableTestTable;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
@@ -154,8 +155,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       updatedRecords = ((JavaRDD<HoodieRecord>)index.tagLocation(updatedRecordsRDD, context, table)).collect();
 
       // Write them to corresponding avro logfiles. Also, set the state transition properly.
-      HoodieTestUtils.writeRecordsToLogFiles(fs, metaClient.getBasePath(),
-          HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, updatedRecords);
+      HoodieWriteableTestTable.of(table, HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)
+          .withLogAppends(updatedRecords);
       metaClient.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
           HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
       writeClient.commit(newCommitTime, jsc.emptyRDD(), Option.empty());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
index 4ab189a..191e90f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMarkerBasedRollbackStrategy.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,11 +76,11 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     // then: ensure files are deleted correctly, non-existent files reported as failed deletes
     assertEquals(2, stats.size());
 
-    List<FileStatus> partAFiles = testTable.listAllFiles("partA");
-    List<FileStatus> partBFiles = testTable.listAllFiles("partB");
+    FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
+    FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
 
-    assertEquals(0, partBFiles.size());
-    assertEquals(1, partAFiles.size());
+    assertEquals(0, partBFiles.length);
+    assertEquals(1, partAFiles.length);
     assertEquals(2, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
     assertEquals(1, stats.stream().mapToInt(r -> r.getFailedDeleteFiles().size()).sum());
   }
@@ -108,15 +109,15 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase {
     // then: ensure files are deleted, rollback block is appended (even if append does not exist)
     assertEquals(2, stats.size());
     // will have the log file
-    List<FileStatus> partBFiles = testTable.listAllFiles("partB");
-    assertEquals(1, partBFiles.size());
-    assertTrue(partBFiles.get(0).getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
-    assertTrue(partBFiles.get(0).getLen() > 0);
-
-    List<FileStatus> partAFiles = testTable.listAllFiles("partA");
-    assertEquals(3, partAFiles.size());
-    assertEquals(2, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
-    assertEquals(1, partAFiles.stream().filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
+    FileStatus[] partBFiles = testTable.listAllFilesInPartition("partB");
+    assertEquals(1, partBFiles.length);
+    assertTrue(partBFiles[0].getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()));
+    assertTrue(partBFiles[0].getLen() > 0);
+
+    FileStatus[] partAFiles = testTable.listAllFilesInPartition("partA");
+    assertEquals(3, partAFiles.length);
+    assertEquals(2, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).count());
+    assertEquals(1, Stream.of(partAFiles).filter(s -> s.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())).filter(f -> f.getLen() > 0).count());
 
     // only partB/f1_001 will be deleted
     assertEquals(1, stats.stream().mapToInt(r -> r.getSuccessDeleteFiles().size()).sum());
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index c2faa83..e167a0f 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -25,8 +25,13 @@ import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.config.HoodieStorageConfig;
@@ -36,19 +41,29 @@ import org.apache.hudi.table.HoodieTable;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
+import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
 
 public class HoodieWriteableTestTable extends HoodieTestTable {
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteableTestTable.class);
 
   private final Schema schema;
   private final BloomFilter filter;
@@ -89,11 +104,15 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
     return (HoodieWriteableTestTable) super.forCommit(instantTime);
   }
 
-  public String withInserts(String partition) throws Exception {
-    return withInserts(partition, new HoodieRecord[0]);
+  public String getFileIdWithInserts(String partition) throws Exception {
+    return getFileIdWithInserts(partition, new HoodieRecord[0]);
   }
 
-  public String withInserts(String partition, HoodieRecord... records) throws Exception {
+  public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception {
+    return getFileIdWithInserts(partition, Arrays.asList(records));
+  }
+
+  public String getFileIdWithInserts(String partition, List<HoodieRecord> records) throws Exception {
     String fileId = UUID.randomUUID().toString();
     withInserts(partition, fileId, records);
     return fileId;
@@ -104,6 +123,10 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
   }
 
   public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
+    return withInserts(partition, fileId, Arrays.asList(records));
+  }
+
+  public HoodieWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
     FileCreateUtils.createPartitionMetaFile(basePath, partition);
     String fileName = baseFileName(currentInstantTime, fileId);
 
@@ -128,4 +151,38 @@ public class HoodieWriteableTestTable extends HoodieTestTable {
 
     return this;
   }
+
+  public HoodieWriteableTestTable withLogAppends(HoodieRecord... records) throws Exception {
+    return withLogAppends(Arrays.asList(records));
+  }
+
+  public HoodieWriteableTestTable withLogAppends(List<HoodieRecord> records) throws Exception {
+    for (List<HoodieRecord> groupedRecords: records.stream()
+        .collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) {
+      appendRecordsToLogFile(groupedRecords);
+    }
+    return this;
+  }
+
+  private void appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
+    String partitionPath = groupedRecords.get(0).getPartitionPath();
+    HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
+    try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
+        .overBaseCommit(location.getInstantTime()).withFs(fs).build()) {
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> {
+        try {
+          GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
+          HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
+          return (IndexedRecord) val;
+        } catch (IOException e) {
+          LOG.warn("Failed to convert record " + r.toString(), e);
+          return null;
+        }
+      }).collect(Collectors.toList()), header));
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 16d1ff9..bca91f8 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -19,7 +19,8 @@
 
 package org.apache.hudi.common.testutils;
 
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
@@ -31,6 +32,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.exception.HoodieException;
 
+import org.apache.hadoop.fs.FileSystem;
+
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.charset.StandardCharsets;
@@ -40,6 +43,9 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanMetadata;
+import static org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCleanerPlan;
+
 public class FileCreateUtils {
 
   private static final String WRITE_TOKEN = "1-0-1";
@@ -122,6 +128,18 @@ public class FileCreateUtils {
     createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_REPLACE_COMMIT_EXTENSION);
   }
 
+  public static void createCleanFile(String basePath, String instantTime, HoodieCleanMetadata metadata) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.CLEAN_EXTENSION, serializeCleanMetadata(metadata).get());
+  }
+
+  public static void createRequestedCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get());
+  }
+
+  public static void createInflightCleanFile(String basePath, String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
+    createMetaFile(basePath, instantTime, HoodieTimeline.INFLIGHT_CLEAN_EXTENSION, serializeCleanerPlan(cleanerPlan).get());
+  }
+
   private static void createAuxiliaryMetaFile(String basePath, String instantTime, String suffix) throws IOException {
     Path parentPath = Paths.get(basePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
     Files.createDirectories(parentPath);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 0aad0c2..96a00da 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
-
 import org.apache.hudi.exception.HoodieIOException;
+
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
@@ -88,7 +88,7 @@ public class HoodieCommonTestHarness {
     try {
       return new HoodieTableFileSystemView(metaClient,
               metaClient.getActiveTimeline(),
-              HoodieTestUtils.listAllDataFilesAndLogFilesInPath(metaClient.getFs(), metaClient.getBasePath())
+              HoodieTestTable.of(metaClient).listAllBaseAndLogFiles()
       );
     } catch (IOException ioe) {
       throw new HoodieIOException("Error getting file system view", ioe);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 06a4aa4..9cacf1f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -19,6 +19,9 @@
 
 package org.apache.hudi.common.testutils;
 
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -41,18 +44,22 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
 import static java.time.temporal.ChronoUnit.SECONDS;
 import static org.apache.hudi.common.table.timeline.HoodieActiveTimeline.COMMIT_FORMATTER;
 import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;
+import static org.apache.hudi.common.testutils.FileCreateUtils.createCleanFile;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createDeltaCommit;
+import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCleanFile;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightCompaction;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightDeltaCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createInflightReplaceCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createMarkerFile;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createReplaceCommit;
+import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCleanFile;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCommit;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedCompaction;
 import static org.apache.hudi.common.testutils.FileCreateUtils.createRequestedDeltaCommit;
@@ -158,6 +165,23 @@ public class HoodieTestTable {
     return this;
   }
 
+  public HoodieTestTable addInflightClean(String instantTime, HoodieCleanerPlan cleanerPlan) throws IOException {
+    createRequestedCleanFile(basePath, instantTime, cleanerPlan);
+    createInflightCleanFile(basePath, instantTime, cleanerPlan);
+    currentInstantTime = instantTime;
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    return this;
+  }
+
+  public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPlan, HoodieCleanMetadata metadata) throws IOException {
+    createRequestedCleanFile(basePath, instantTime, cleanerPlan);
+    createInflightCleanFile(basePath, instantTime, cleanerPlan);
+    createCleanFile(basePath, instantTime, metadata);
+    currentInstantTime = instantTime;
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    return this;
+  }
+
   public HoodieTestTable addRequestedCompaction(String instantTime) throws IOException {
     createRequestedCompaction(basePath, instantTime);
     currentInstantTime = instantTime;
@@ -348,12 +372,36 @@ public class HoodieTestTable {
     return baseFileName(currentInstantTime, fileId);
   }
 
-  public List<FileStatus> listAllFiles(String partitionPath) throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString()));
+  public FileStatus[] listAllBaseFiles() throws IOException {
+    return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension());
+  }
+
+  public FileStatus[] listAllBaseFiles(String fileExtension) throws IOException {
+    return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
+        .filter(status -> status.getPath().getName().endsWith(fileExtension))
+        .toArray(FileStatus[]::new);
+  }
+
+  public FileStatus[] listAllLogFiles() throws IOException {
+    return listAllLogFiles(HoodieFileFormat.HOODIE_LOG.getFileExtension());
+  }
+
+  public FileStatus[] listAllLogFiles(String fileExtension) throws IOException {
+    return FileSystemTestUtils.listRecursive(fs, new Path(basePath)).stream()
+        .filter(status -> status.getPath().getName().contains(fileExtension))
+        .toArray(FileStatus[]::new);
+  }
+
+  public FileStatus[] listAllBaseAndLogFiles() throws IOException {
+    return Stream.concat(Stream.of(listAllBaseFiles()), Stream.of(listAllLogFiles())).toArray(FileStatus[]::new);
+  }
+
+  public FileStatus[] listAllFilesInPartition(String partitionPath) throws IOException {
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, partitionPath).toString())).toArray(new FileStatus[0]);
   }
 
-  public List<FileStatus> listAllFilesInTempFolder() throws IOException {
-    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString()));
+  public FileStatus[] listAllFilesInTempFolder() throws IOException {
+    return FileSystemTestUtils.listRecursive(fs, new Path(Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME).toString())).toArray(new FileStatus[0]);
   }
 
   public static class HoodieTestTableException extends RuntimeException {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 6547f80..1f86a59 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -18,74 +18,43 @@
 
 package org.apache.hudi.common.testutils;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.avro.model.HoodieActionInstant;
-import org.apache.hudi.avro.model.HoodieCleanMetadata;
-import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
-import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
-import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
-import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIOException;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * A utility class for testing.
@@ -96,7 +65,6 @@ public class HoodieTestUtils {
   public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
   public static final int DEFAULT_LOG_VERSION = 1;
   public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
-  private static Random rand = new Random(46474747);
 
   public static Configuration getDefaultHadoopConf() {
     return new Configuration();
@@ -171,35 +139,6 @@ public class HoodieTestUtils {
     }
   }
 
-  public static void createPendingCleanFiles(HoodieTableMetaClient metaClient, String... instantTimes) {
-    for (String instantTime : instantTimes) {
-      Arrays.asList(HoodieTimeline.makeRequestedCleanerFileName(instantTime),
-          HoodieTimeline.makeInflightCleanerFileName(instantTime))
-          .forEach(f -> {
-            FSDataOutputStream os = null;
-            try {
-              Path commitFile = new Path(Paths
-                  .get(metaClient.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME, f).toString());
-              os = metaClient.getFs().create(commitFile, true);
-              // Write empty clean metadata
-              os.write(TimelineMetadataUtils.serializeCleanerPlan(
-                  new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(),
-                      CleanPlanV2MigrationHandler.VERSION, new HashMap<>())).get());
-            } catch (IOException ioe) {
-              throw new HoodieIOException(ioe.getMessage(), ioe);
-            } finally {
-              if (null != os) {
-                try {
-                  os.close();
-                } catch (IOException e) {
-                  throw new HoodieIOException(e.getMessage(), e);
-                }
-              }
-            }
-          });
-    }
-  }
-
   /**
    * @deprecated Use {@link HoodieTestTable} instead.
    */
@@ -243,26 +182,6 @@ public class HoodieTestUtils {
         TimelineMetadataUtils.serializeCompactionPlan(plan));
   }
 
-  public static void createCleanFiles(HoodieTableMetaClient metaClient, String basePath,
-      String instantTime, Configuration configuration)
-      throws IOException {
-    createPendingCleanFiles(metaClient, instantTime);
-    Path commitFile = new Path(
-        basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + HoodieTimeline.makeCleanerFileName(instantTime));
-    FileSystem fs = FSUtils.getFs(basePath, configuration);
-    try (FSDataOutputStream os = fs.create(commitFile, true)) {
-      HoodieCleanStat cleanStats = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
-          DEFAULT_PARTITION_PATHS[rand.nextInt(DEFAULT_PARTITION_PATHS.length)], new ArrayList<>(), new ArrayList<>(),
-          new ArrayList<>(), instantTime);
-      // Create the clean metadata
-
-      HoodieCleanMetadata cleanMetadata =
-          CleanerUtils.convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats));
-      // Write empty clean metadata
-      os.write(TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata).get());
-    }
-  }
-
   public static <T extends Serializable> T serializeDeserialize(T object, Class<T> clazz) {
     // Using Kyro as the default serializer in Spark Jobs
     Kryo kryo = new Kryo();
@@ -279,78 +198,6 @@ public class HoodieTestUtils {
     return deseralizedObject;
   }
 
-  /**
-   * @deprecated Use {@link HoodieTestTable} instead.
-   */
-  public static void writeRecordsToLogFiles(FileSystem fs, String basePath, Schema schema,
-      List<HoodieRecord> updatedRecords) {
-    Map<HoodieRecordLocation, List<HoodieRecord>> groupedUpdated =
-        updatedRecords.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation));
-
-    groupedUpdated.forEach((location, value) -> {
-      String partitionPath = value.get(0).getPartitionPath();
-
-      Writer logWriter;
-      try {
-        logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath))
-          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-          .overBaseCommit(location.getInstantTime()).withFs(fs).build();
-
-        Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
-        header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
-        logWriter.appendBlock(new HoodieAvroDataBlock(value.stream().map(r -> {
-          try {
-            GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get();
-            HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), "");
-            return (IndexedRecord) val;
-          } catch (IOException e) {
-            return null;
-          }
-        }).collect(Collectors.toList()), header));
-        logWriter.close();
-      } catch (Exception e) {
-        fail(e.toString());
-      }
-    });
-  }
-
-  // TODO: should be removed
-  public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath) throws IOException {
-    return listAllDataFilesInPath(fs, basePath, HoodieFileFormat.PARQUET.getFileExtension());
-  }
-
-  public static FileStatus[] listAllDataFilesInPath(FileSystem fs, String basePath, String datafileExtension)
-      throws IOException {
-    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
-    List<FileStatus> returns = new ArrayList<>();
-    while (itr.hasNext()) {
-      LocatedFileStatus status = itr.next();
-      if (status.getPath().getName().contains(datafileExtension) && !status.getPath().getName().contains(".marker")) {
-        returns.add(status);
-      }
-    }
-    return returns.toArray(new FileStatus[returns.size()]);
-  }
-
-  public static FileStatus[] listAllLogFilesInPath(FileSystem fs, String basePath)
-      throws IOException {
-    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(new Path(basePath), true);
-    List<FileStatus> returns = new ArrayList<>();
-    while (itr.hasNext()) {
-      LocatedFileStatus status = itr.next();
-      if (status.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
-        returns.add(status);
-      }
-    }
-    return returns.toArray(new FileStatus[returns.size()]);
-  }
-
-  public static FileStatus[] listAllDataFilesAndLogFilesInPath(FileSystem fs, String basePath) throws IOException {
-    return Stream.concat(Arrays.stream(listAllDataFilesInPath(fs, basePath)), Arrays.stream(listAllLogFilesInPath(fs, basePath)))
-            .toArray(FileStatus[]::new);
-  }
-
   public static List<HoodieWriteStat> generateFakeHoodieWriteStat(int limit) {
     List<HoodieWriteStat> writeStatList = new ArrayList<>();
     for (int i = 0; i < limit; i++) {