You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/15 00:29:00 UTC

[hudi] branch master updated: [HUDI-996] Add functional test in hudi-client (#1824)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b399b4a  [HUDI-996] Add functional test in hudi-client (#1824)
b399b4a is described below

commit b399b4ad43b8caa69f129e756e51ad4bc8c81de2
Author: Raymond Xu <27...@users.noreply.github.com>
AuthorDate: Tue Jul 14 17:28:50 2020 -0700

    [HUDI-996] Add functional test in hudi-client (#1824)
    
    - Add functional test suite in hudi-client
    - Tag TestHBaseIndex as functional
---
 hudi-client/pom.xml                                | 18 +++++
 ...rovider.java => ClientFunctionalTestSuite.java} | 20 +++---
 .../apache/hudi/index/hbase/TestHBaseIndex.java    | 80 ++++++++++------------
 .../hudi/testutils/FunctionalTestHarness.java      | 52 ++++++++++++--
 .../testutils/{ => providers}/DFSProvider.java     |  2 +-
 .../HoodieMetaClientProvider.java}                 | 22 +++---
 .../HoodieWriteClientProvider.java}                | 16 ++---
 .../testutils/{ => providers}/SparkProvider.java   |  2 +-
 8 files changed, 131 insertions(+), 81 deletions(-)

diff --git a/hudi-client/pom.xml b/hudi-client/pom.xml
index 326cf83..a390923 100644
--- a/hudi-client/pom.xml
+++ b/hudi-client/pom.xml
@@ -268,5 +268,23 @@
       <artifactId>mockito-junit-jupiter</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-runner</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-suite-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.junit.platform</groupId>
+      <artifactId>junit-platform-commons</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
similarity index 70%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
index 16cc471..4e62618 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/ClientFunctionalTestSuite.java
@@ -17,18 +17,16 @@
  * under the License.
  */
 
-package org.apache.hudi.testutils;
+package org.apache.hudi;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.platform.suite.api.IncludeTags;
+import org.junit.platform.suite.api.SelectPackages;
+import org.junit.runner.RunWith;
 
-public interface DFSProvider {
-
-  MiniDFSCluster dfsCluster();
-
-  DistributedFileSystem dfs();
-
-  Path dfsBasePath();
+@RunWith(JUnitPlatform.class)
+@SelectPackages("org.apache.hudi.index")
+@IncludeTags("functional")
+public class ClientFunctionalTestSuite {
 
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
index 6b6f11f..b3d2f5a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java
@@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.FunctionalTestHarness;
 import org.apache.hudi.testutils.HoodieTestDataGenerator;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,10 +47,10 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestMethodOrder;
 
@@ -76,12 +76,17 @@ import static org.mockito.Mockito.when;
  * {@link MethodOrderer.Alphanumeric} to make sure the tests run in order. Please alter the order of tests running carefully.
  */
 @TestMethodOrder(MethodOrderer.Alphanumeric.class)
-public class TestHBaseIndex extends HoodieClientTestHarness {
+@Tag("functional")
+public class TestHBaseIndex extends FunctionalTestHarness {
 
   private static final String TABLE_NAME = "test_table";
   private static HBaseTestingUtility utility;
   private static Configuration hbaseConfig;
 
+  private Configuration hadoopConf;
+  private HoodieTestDataGenerator dataGen;
+  private HoodieTableMetaClient metaClient;
+
   @AfterAll
   public static void clean() throws Exception {
     if (utility != null) {
@@ -104,21 +109,10 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
   @BeforeEach
   public void setUp() throws Exception {
-    // Initialize a local spark env
-    initSparkContexts("TestHBaseIndex");
+    hadoopConf = jsc().hadoopConfiguration();
     hadoopConf.addResource(utility.getConfiguration());
-
-    // Create a temp folder as the base path
-    initPath();
-    initTestDataGenerator();
-    initMetaClient();
-  }
-
-  @AfterEach
-  public void tearDown() throws Exception {
-    cleanupSparkContexts();
-    cleanupTestDataGenerator();
-    cleanupClients();
+    metaClient = getHoodieMetaClient(hadoopConf, basePath());
+    dataGen = new HoodieTestDataGenerator();
   }
 
   @Test
@@ -126,7 +120,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     final String newCommitTime = "001";
     final int numRecords = 10;
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
     // Load to memory
     HoodieWriteConfig config = getConfig();
@@ -136,7 +130,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
       assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Insert 200 records
@@ -146,7 +140,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
       // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
       // commit
-      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
       assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Now commit this & update location of records inserted and validate no errors
@@ -154,7 +148,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
       assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
       assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
       assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -167,7 +161,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     final String newCommitTime = "001";
     final int numRecords = 10;
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
     // Load to memory
     HoodieWriteConfig config = getConfig();
@@ -178,7 +172,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
-    index.tagLocation(writeRecords, jsc, hoodieTable);
+    index.tagLocation(writeRecords, jsc(), hoodieTable);
 
     // Duplicate upsert and ensure correctness is maintained
     // We are trying to approximately imitate the case when the RDD is recomputed. For RDD creating, driver code is not
@@ -194,7 +188,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // Now tagLocation for these records, hbaseIndex should tag them correctly
     metaClient = HoodieTableMetaClient.reload(metaClient);
     hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-    List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    List<HoodieRecord> taggedRecords = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
     assertEquals(numRecords, taggedRecords.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
     assertEquals(numRecords, taggedRecords.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
     assertEquals(numRecords, taggedRecords.stream().filter(record -> (record.getCurrentLocation() != null
@@ -211,7 +205,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     final String newCommitTime = writeClient.startCommit();
     final int numRecords = 10;
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
 
     // Insert 200 records
@@ -222,7 +216,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     writeClient.commit(newCommitTime, writeStatues);
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
     // Now tagLocation for these records, hbaseIndex should tag them
-    List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
     assertEquals(numRecords, records2.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
 
     // check tagged records are tagged with correct fileIds
@@ -238,7 +232,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
     // Now tagLocation for these records, hbaseIndex should not tag them since it was a rolled
     // back commit
-    List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+    List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
     assertEquals(0, records3.stream().filter(HoodieRecord::isCurrentLocationKnown).count());
     assertEquals(0, records3.stream().filter(record -> record.getCurrentLocation() != null).count());
   }
@@ -262,7 +256,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
@@ -271,7 +265,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     assertNoWriteErrors(writeStatues.collect());
 
     // Now tagLocation for these records, hbaseIndex should tag them
-    index.tagLocation(writeRecords, jsc, hoodieTable);
+    index.tagLocation(writeRecords, jsc(), hoodieTable);
 
     // 3 batches should be executed given batchSize = 100 and parallelism = 1
     verify(table, times(3)).get((List<Get>) any());
@@ -287,7 +281,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // start a commit and generate test data
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
     metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
@@ -309,7 +303,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     // Get all the files generated
     int numberOfDataFileIds = (int) writeStatues.map(status -> status.getFileId()).distinct().count();
 
-    index.updateLocation(writeStatues, jsc, hoodieTable);
+    index.updateLocation(writeStatues, jsc(), hoodieTable);
     // 3 batches should be executed given batchSize = 100 and <=numberOfDataFileIds getting updated,
     // so each fileId ideally gets updates
     verify(table, atMost(numberOfDataFileIds)).put((List<Put>) any());
@@ -319,7 +313,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
   public void testsHBasePutAccessParallelism() {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
-    final JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(
+    final JavaRDD<WriteStatus> writeStatusRDD = jsc().parallelize(
         Arrays.asList(getSampleWriteStatus(1, 2), getSampleWriteStatus(0, 3), getSampleWriteStatus(10, 0)), 10);
     final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
     final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
@@ -334,7 +328,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     HoodieWriteConfig config = getConfig();
     HBaseIndex index = new HBaseIndex(config);
     final JavaRDD<WriteStatus> writeStatusRDD =
-        jsc.parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
+        jsc().parallelize(Arrays.asList(getSampleWriteStatus(0, 2), getSampleWriteStatus(0, 1)), 10);
     final Tuple2<Long, Integer> tuple = index.getHBasePutAccessParallelism(writeStatusRDD);
     final int hbasePutAccessParallelism = Integer.parseInt(tuple._2.toString());
     final int hbaseNumPuts = Integer.parseInt(tuple._1.toString());
@@ -348,7 +342,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     final String newCommitTime = "001";
     final int numRecords = 10;
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
     // Load to memory
     HoodieWriteConfig config = getConfig(2);
@@ -358,7 +352,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
       assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
       // Insert 200 records
       writeClient.startCommitWithTime(newCommitTime);
@@ -367,7 +361,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
 
       // Now tagLocation for these records, hbaseIndex should not tag them since it was a failed
       // commit
-      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      JavaRDD<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable);
       assertEquals(0, records2.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Now commit this & update location of records inserted and validate no errors
@@ -375,7 +369,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
       assertEquals(numRecords, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
       assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
       assertEquals(numRecords, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -388,7 +382,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
     final String newCommitTime = "001";
     final int numRecords = 10;
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, numRecords);
-    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
 
     // Load to memory
     HoodieWriteConfig config = getConfig();
@@ -398,7 +392,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       HoodieTable hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
 
       // Test tagLocation without any entries in index
-      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc, hoodieTable);
+      JavaRDD<HoodieRecord> records1 = index.tagLocation(writeRecords, jsc(), hoodieTable);
       assertEquals(0, records1.filter(record -> record.isCurrentLocationKnown()).count());
 
       // Insert records
@@ -410,7 +404,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
       // Now tagLocation for these records, hbaseIndex should tag them correctly
       metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.create(metaClient, config, hadoopConf);
-      List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      List<HoodieRecord> records2 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
       assertEquals(numRecords, records2.stream().filter(record -> record.isCurrentLocationKnown()).count());
       assertEquals(numRecords, records2.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
       assertEquals(numRecords, records2.stream().filter(record -> (record.getCurrentLocation() != null
@@ -425,12 +419,12 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
         newWriteStatus.setStat(new HoodieWriteStat());
         return newWriteStatus;
       });
-      JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc, hoodieTable);
+      JavaRDD<WriteStatus> deleteStatus = index.updateLocation(deleteWriteStatues, jsc(), hoodieTable);
       assertEquals(deleteStatus.count(), deleteWriteStatues.count());
       assertNoWriteErrors(deleteStatus.collect());
 
       // Ensure no records can be tagged
-      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc, hoodieTable).collect();
+      List<HoodieRecord> records3 = index.tagLocation(writeRecords, jsc(), hoodieTable).collect();
       assertEquals(0, records3.stream().filter(record -> record.isCurrentLocationKnown()).count());
       assertEquals(numRecords, records3.stream().map(record -> record.getKey().getRecordKey()).distinct().count());
       assertEquals(0, records3.stream().filter(record -> (record.getCurrentLocation() != null
@@ -456,7 +450,7 @@ public class TestHBaseIndex extends HoodieClientTestHarness {
   }
 
   private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+    return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(1, 1)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
             .withInlineCompaction(false).build())
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 562563e..27db072 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -19,14 +19,26 @@
 
 package org.apache.hudi.testutils;
 
+import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
-
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.providers.DFSProvider;
+import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
+import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
+import org.apache.hudi.testutils.providers.SparkProvider;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.SparkSession;
@@ -35,8 +47,13 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
+import java.util.Properties;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.apache.hudi.common.testutils.HoodieTestUtils.RAW_TRIPS_TEST_NAME;
 
-public class FunctionalTestHarness implements SparkProvider, DFSProvider {
+public class FunctionalTestHarness implements SparkProvider, DFSProvider, HoodieMetaClientProvider, HoodieWriteClientProvider {
 
   private static transient SparkSession spark;
   private static transient SQLContext sqlContext;
@@ -53,6 +70,10 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
   @TempDir
   protected java.nio.file.Path tempDir;
 
+  public String basePath() {
+    return tempDir.toAbsolutePath().toString();
+  }
+
   @Override
   public SparkSession spark() {
     return spark;
@@ -83,15 +104,32 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider {
     return dfs.getWorkingDirectory();
   }
 
+  public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
+    return getHoodieMetaClient(hadoopConf, basePath, new Properties());
+  }
+
+  @Override
+  public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException {
+    props.putIfAbsent(HoodieTableConfig.HOODIE_BASE_FILE_FORMAT_PROP_NAME, PARQUET.toString());
+    props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, RAW_TRIPS_TEST_NAME);
+    props.putIfAbsent(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, COPY_ON_WRITE.name());
+    props.putIfAbsent(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME, HoodieAvroPayload.class.getName());
+    return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, props);
+  }
+
+  @Override
+  public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException {
+    return new HoodieWriteClient(jsc, cfg, false, HoodieIndex.createIndex(cfg));
+  }
+
   @BeforeEach
   public synchronized void runBeforeEach() throws Exception {
     initialized = spark != null && hdfsTestService != null;
     if (!initialized) {
-      FileSystem.closeAll();
-
-      spark = SparkSession.builder()
-          .config(HoodieWriteClient.registerClasses(conf()))
-          .getOrCreate();
+      SparkConf sparkConf = conf();
+      HoodieWriteClient.registerClasses(sparkConf);
+      HoodieReadClient.addHoodieSupport(sparkConf);
+      spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
       jsc = new JavaSparkContext(spark.sparkContext());
 
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
similarity index 95%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
index 16cc471..62b48cb 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/DFSProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
similarity index 50%
copy from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
copy to hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 16cc471..0cd7ed5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -17,18 +17,24 @@
  * under the License.
  */
 
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 
-public interface DFSProvider {
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 
-  MiniDFSCluster dfsCluster();
+import java.io.IOException;
+import java.util.Properties;
 
-  DistributedFileSystem dfs();
+public interface HoodieMetaClientProvider {
 
-  Path dfsBasePath();
+  HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath, Properties props) throws IOException;
 
+  default HoodieTableFileSystemView getHoodieTableFileSystemView(
+      HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, FileStatus[] fileStatuses) {
+    return new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+  }
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
similarity index 73%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
index 16cc471..4840af0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/DFSProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/HoodieWriteClientProvider.java
@@ -17,18 +17,14 @@
  * under the License.
  */
 
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.config.HoodieWriteConfig;
 
-public interface DFSProvider {
+import java.io.IOException;
 
-  MiniDFSCluster dfsCluster();
-
-  DistributedFileSystem dfs();
-
-  Path dfsBasePath();
+public interface HoodieWriteClientProvider {
 
+  HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) throws IOException;
 }
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
similarity index 97%
rename from hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
rename to hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
index 948f736..cdf5ac4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/SparkProvider.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/providers/SparkProvider.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.hudi.testutils;
+package org.apache.hudi.testutils.providers;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;