You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/09/30 12:38:57 UTC

[incubator-hudi] branch master updated: [HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e803b  [HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)
01e803b is described below

commit 01e803b00e5acf9bb1dbe38340e38e8feb8d037d
Author: vinoyang <ya...@gmail.com>
AuthorDate: Mon Sep 30 20:38:52 2019 +0800

    [HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)
---
 .../org/apache/hudi/HoodieClientTestHarness.java   | 13 ++--
 .../src/test/java/org/apache/hudi/TestCleaner.java | 76 ++++++++++------------
 .../java/org/apache/hudi/TestClientRollback.java   |  6 +-
 .../hudi/index/TestHBaseQPSResourceAllocator.java  |  4 +-
 .../java/org/apache/hudi/index/TestHbaseIndex.java | 19 +++---
 .../org/apache/hudi/index/TestHoodieIndex.java     |  4 +-
 .../hudi/index/bloom/TestHoodieBloomIndex.java     | 34 +++++-----
 .../index/bloom/TestHoodieGlobalBloomIndex.java    | 14 ++--
 .../apache/hudi/io/TestHoodieCommitArchiveLog.java | 21 +++---
 .../org/apache/hudi/io/TestHoodieCompactor.java    | 15 ++---
 .../org/apache/hudi/io/TestHoodieMergeHandle.java  | 12 ++--
 .../apache/hudi/table/TestCopyOnWriteTable.java    | 23 ++++---
 .../apache/hudi/table/TestMergeOnReadTable.java    | 32 ++++-----
 .../hudi/common/table/HoodieTableMetaClient.java   | 10 +++
 14 files changed, 142 insertions(+), 141 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
index 84a9e21..10fb0bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
@@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
   protected TemporaryFolder folder = null;
   protected transient HoodieTestDataGenerator dataGen = null;
   protected transient ExecutorService executorService;
+  protected transient HoodieTableMetaClient metaClient;
 
   //dfs
   protected String dfsBasePath;
@@ -72,7 +73,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
     initSparkContexts();
     initTestDataGenerator();
     initFileSystem();
-    initTableType();
+    initMetaClient();
   }
 
   /**
@@ -80,7 +81,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
    * @throws IOException
    */
   public void cleanupResources() throws IOException {
-    cleanupTableType();
+    cleanupMetaClient();
     cleanupSparkContexts();
     cleanupTestDataGenerator();
     cleanupFileSystem();
@@ -191,7 +192,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
    *
    * @throws IOException
    */
-  protected void initTableType() throws IOException {
+  protected void initMetaClient() throws IOException {
     if (basePath == null) {
       throw new IllegalStateException("The base path has not been initialized.");
     }
@@ -200,14 +201,14 @@ public abstract class HoodieClientTestHarness implements Serializable {
       throw new IllegalStateException("The Spark context has not been initialized.");
     }
 
-    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+    metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
   }
 
   /**
    * Cleanups table type.
    */
-  protected void cleanupTableType() {
-
+  protected void cleanupMetaClient() {
+    metaClient = null;
   }
 
   /**
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index 1e8df4a..fe25491 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -114,7 +114,7 @@ public class TestCleaner extends TestHoodieClientBase {
     assertNoWriteErrors(statuses);
 
     // verify that there is a commit
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
     assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
     // Should have 100 records in table (check using Index), all in locations marked at commit
@@ -200,8 +200,8 @@ public class TestCleaner extends TestHoodieClientBase {
       insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
 
       Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
-      HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-      HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         TableFileSystemView fsView = table.getFileSystemView();
         Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst()
@@ -239,8 +239,8 @@ public class TestCleaner extends TestHoodieClientBase {
           // Verify there are no errors
           assertNoWriteErrors(statuses);
 
-          metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-          table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
+          metaClient = HoodieTableMetaClient.reload(metaClient);
+          table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
           HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
 
           TableFileSystemView fsView = table.getFileSystemView();
@@ -375,8 +375,8 @@ public class TestCleaner extends TestHoodieClientBase {
         // Verify there are no errors
         assertNoWriteErrors(statuses);
 
-        HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-        HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
+        metaClient = HoodieTableMetaClient.reload(metaClient);
+        HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
         HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
         Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
         Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
@@ -424,9 +424,8 @@ public class TestCleaner extends TestHoodieClientBase {
         .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
     String file1P1C0 = HoodieTestUtils
         .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
     assertEquals("Must not clean any files", 0,
@@ -442,8 +441,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // make next commit, with 1 insert & 1 update per partition
     HoodieTestUtils.createCommitFiles(basePath, "001");
-    table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     String file2P0C1 = HoodieTestUtils
         .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -472,8 +471,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // make next commit, with 2 updates to existing files, and 1 insert
     HoodieTestUtils.createCommitFiles(basePath, "002");
-    table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
-        config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     HoodieTestUtils
         .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
@@ -578,9 +577,8 @@ public class TestCleaner extends TestHoodieClientBase {
     String file1P1C0 = HoodieTestUtils
         .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
 
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
     assertEquals("Must not clean any files", 0,
@@ -596,8 +594,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // make next commit, with 1 insert & 1 update per partition
     HoodieTestUtils.createCommitFiles(basePath, "001");
-    table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
-        config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     String file2P0C1 = HoodieTestUtils
         .createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -626,8 +624,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // make next commit, with 2 updates to existing files, and 1 insert
     HoodieTestUtils.createCommitFiles(basePath, "002");
-    table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
-        config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     HoodieTestUtils
         .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
@@ -646,8 +644,8 @@ public class TestCleaner extends TestHoodieClientBase {
 
     // make next commit, with 2 updates to existing files, and 1 insert
     HoodieTestUtils.createCommitFiles(basePath, "003");
-    table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
-        config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     HoodieTestUtils
         .createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
@@ -700,9 +698,8 @@ public class TestCleaner extends TestHoodieClientBase {
     assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
 
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     table.rollback(jsc, "000", true);
     assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
@@ -722,9 +719,8 @@ public class TestCleaner extends TestHoodieClientBase {
     // with just some commit metadata, but no data/partitionPaths.
     HoodieTestUtils.createCommitFiles(basePath, "000");
 
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
     assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
@@ -783,9 +779,8 @@ public class TestCleaner extends TestHoodieClientBase {
     updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
     updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");
 
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
 
     assertEquals(100,
@@ -890,9 +885,8 @@ public class TestCleaner extends TestHoodieClientBase {
       for (int j = 1; j <= i; j++) {
         if (j == i && j <= maxNumFileIdsForCompaction) {
           expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
-          HoodieTable table = HoodieTable.getHoodieTable(
-              new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-              jsc);
+          metaClient = HoodieTableMetaClient.reload(metaClient);
+          HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
           FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(
               HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
               .filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
@@ -934,15 +928,13 @@ public class TestCleaner extends TestHoodieClientBase {
     }
 
     // Clean now
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
 
     // Test for safety
-    final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
-        jsc);
+    final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
+    final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
       String fileId = entry.getKey();
@@ -961,7 +953,7 @@ public class TestCleaner extends TestHoodieClientBase {
     // Test for progress (Did we clean some files ?)
     long numFilesUnderCompactionDeleted =
         hoodieCleanStats.stream().flatMap(cleanStat -> {
-          return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map(
+          return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()).map(
               fileIdWithCommitTime -> {
                 if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
                   Assert.assertTrue("Deleted instant time must be less than pending compaction",
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
index 20cc86c..11504a4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
@@ -97,7 +97,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       assertNoWriteErrors(statuses);
       List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
           getConfig().shouldAssumeDatePartitioning());
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
       final ReadOptimizedView view1 = table.getROFileSystemView();
 
@@ -122,7 +122,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
       final ReadOptimizedView view2 = table.getROFileSystemView();
 
@@ -143,7 +143,7 @@ public class TestClientRollback extends TestHoodieClientBase {
       HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
       client.rollbackToSavepoint(savepoint.getTimestamp());
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
       final ReadOptimizedView view3 = table.getROFileSystemView();
       dataFiles = partitionPaths.stream().flatMap(s -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 622395d..9efe708 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -53,14 +53,14 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
     initTempFolderAndPath();
     basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
     // Initialize table
-    initTableType();
+    initMetaClient();
   }
 
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTempFolderAndPath();
-    cleanupTableType();
+    cleanupMetaClient();
     if (utility != null) {
       utility.shutdownMiniCluster();
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 9d89186..6c2fc0f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.HoodieWriteClient;
 import org.apache.hudi.WriteStatus;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -104,9 +103,8 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
 
     // Create a temp folder as the base path
     initTempFolderAndPath();
-    // Initialize table
-    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
     initTestDataGenerator();
+    initMetaClient();
   }
 
   @After
@@ -114,6 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     cleanupSparkContexts();
     cleanupTempFolderAndPath();
     cleanupTestDataGenerator();
+    cleanupMetaClient();
   }
 
   private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -132,7 +131,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
       writeClient.startCommit();
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
       // Test tagLocation without any entries in index
@@ -151,7 +150,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
       // Now commit this & update location of records inserted and validate no errors
       writeClient.commit(newCommitTime, writeStatues);
       // Now tagLocation for these records, hbaseIndex should tag them correctly
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
       javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
       assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
@@ -173,7 +172,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     HBaseIndex index = new HBaseIndex(config);
     HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
     writeClient.startCommit();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -185,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     // Now commit this & update location of records inserted and validate no errors
     writeClient.commit(newCommitTime, writeStatues);
     // Now tagLocation for these records, hbaseIndex should tag them correctly
-    metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
     JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
     assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10);
@@ -205,7 +204,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Insert 200 records
@@ -257,7 +256,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Insert 250 records
@@ -282,7 +281,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
     String newCommitTime = writeClient.startCommit();
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
     JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Insert 200 records
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index d073757..4354216 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -36,14 +36,14 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
   public void setUp() throws Exception {
     initSparkContexts("TestHoodieIndex");
     initTempFolderAndPath();
-    initTableType();
+    initMetaClient();
   }
 
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTempFolderAndPath();
-    cleanupTableType();
+    cleanupMetaClient();
   }
 
   @Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 28d19eb..69d0cfa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -44,7 +44,6 @@ import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
@@ -92,10 +91,10 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     initSparkContexts("TestHoodieBloomIndex");
     initTempFolderAndPath();
     initFileSystem();
-    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
     // We have some records to be tagged (two different partitions)
     schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
     schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
+    initMetaClient();
   }
 
   @After
@@ -103,6 +102,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     cleanupSparkContexts();
     cleanupFileSystem();
     cleanupTempFolderAndPath();
+    cleanupMetaClient();
   }
 
   private HoodieWriteConfig makeConfig() {
@@ -163,8 +163,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
             false);
 
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
     // Still 0, as no valid commit
     assertEquals(filesList.size(), 0);
@@ -174,7 +174,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
     new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
 
-    table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     filesList = index.loadInvolvedFiles(partitions, jsc, table);
     assertEquals(filesList.size(), 4);
 
@@ -286,9 +286,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     // We have some records to be tagged (two different partitions)
     JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
     // Also create the metadata and config
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = makeConfig();
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -331,9 +331,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
 
     // Also create the metadata and config
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = makeConfig();
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -353,8 +353,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
         HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
 
     // We do the tag again
-    metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
 
@@ -401,9 +401,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
     JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4));
 
     // Also create the metadata and config
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = makeConfig();
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -424,8 +424,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
         HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
 
     // We do the tag again
-    metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
 
     // Check results
@@ -473,9 +473,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
 
     // We do the tag
     JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = makeConfig();
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
     JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 9993cb4..11669a3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.common.TestRawTripPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.FSUtils;
 import org.apache.hudi.common.util.FileIOUtils;
@@ -66,16 +65,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
   public void setUp() throws Exception {
     initSparkContexts("TestHoodieGlobalBloomIndex");
     initTempFolderAndPath();
-    HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
     // We have some records to be tagged (two different partitions)
     schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
     schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
+    initMetaClient();
   }
 
   @After
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTempFolderAndPath();
+    cleanupMetaClient();
   }
 
   @Test
@@ -127,8 +127,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
 
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
     List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     // partitions will NOT be respected by this loadInvolvedFiles(...) call
     List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
     // Still 0, as no valid commit
@@ -139,7 +139,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
     new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
     new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
 
-    table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     filesList = index.loadInvolvedFiles(partitions, jsc, table);
     assertEquals(filesList.size(), 4);
 
@@ -264,8 +264,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
         HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
 
     // intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
 
     // Add some commits
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 5dee6f6..9a9ddc0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -54,6 +54,7 @@ import org.junit.Test;
 public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
 
   private Configuration hadoopConf;
+  private HoodieTableMetaClient metaClient;
 
   @Before
   public void init() throws Exception {
@@ -63,7 +64,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     hadoopConf = dfs.getConf();
     jsc.hadoopConfiguration().addResource(dfs.getConf());
     dfs.mkdirs(new Path(basePath));
-    HoodieTestUtils.init(hadoopConf, basePath);
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath);
   }
 
   @After
@@ -78,8 +79,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .forTable("test-trip-table").build();
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
-        new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
     boolean result = archiveLog.archiveIfRequired(jsc);
     assertTrue(result);
   }
@@ -135,7 +136,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"), dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
 
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
 
     assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
@@ -158,8 +159,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
     // verify in-flight instants before archive
     verifyInflightInstants(metaClient, 3);
 
-    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
-        new HoodieTableMetaClient(dfs.getConf(), basePath, true));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
 
     assertTrue(archiveLog.archiveIfRequired(jsc));
 
@@ -235,7 +236,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .forTable("test-trip-table").withCompactionConfig(
             HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
     // Requested Compaction
     HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -302,7 +303,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .forTable("test-trip-table").withCompactionConfig(
             HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -328,7 +329,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .forTable("test-trip-table").withCompactionConfig(
             HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -360,7 +361,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
         .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
         .forTable("test-trip-table").withCompactionConfig(
             HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
     HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
     HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
index 8d9a676..0a02b70 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
@@ -52,6 +52,7 @@ import org.junit.Test;
 public class TestHoodieCompactor extends HoodieClientTestHarness {
 
   private Configuration hadoopConf;
+  private HoodieTableMetaClient metaClient;
 
   @Before
   public void setUp() throws Exception {
@@ -62,7 +63,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
     initTempFolderAndPath();
     hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
     fs = FSUtils.getFs(basePath, hadoopConf);
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
     initTestDataGenerator();
   }
 
@@ -96,9 +97,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
   @Test(expected = HoodieNotSupportedException.class)
   public void testCompactionOnCopyOnWriteFail() throws Exception {
-    HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
     String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
     table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
@@ -106,8 +105,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
 
   @Test
   public void testCompactionEmpty() throws Exception {
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieWriteConfig config = getConfig();
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     try (HoodieWriteClient writeClient = getWriteClient(config);) {
 
@@ -136,7 +135,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
 
       // Update all the 100 records
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
       newCommitTime = "101";
@@ -153,7 +152,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
               updatedRecords);
 
       // Verify that all data file has one log file
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       table = HoodieTable.getHoodieTable(metaClient, config, jsc);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath)
@@ -164,7 +163,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
       }
 
       // Do a compaction
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
       String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 288e04c..68db6ca 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -58,17 +58,17 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
     initSparkContexts("TestHoodieMergeHandle");
     initTempFolderAndPath();
     initFileSystem();
-    initTableType();
     initTestDataGenerator();
+    initMetaClient();
   }
 
   @After
   public void tearDown() throws Exception {
-    cleanupTableType();
     cleanupFileSystem();
     cleanupTestDataGenerator();
     cleanupTempFolderAndPath();
     cleanupSparkContexts();
+    cleanupMetaClient();
   }
 
   private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -109,7 +109,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // verify that there is a commit
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
       assertEquals("Expecting a single commit.", 1,
           timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
@@ -137,7 +137,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // verify that there are 2 commits
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
       assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
           .countInstants());
@@ -161,7 +161,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
       assertNoWriteErrors(statuses);
 
       // verify that there are now 3 commits
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
       assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
           .countInstants());
@@ -259,7 +259,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
           .map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100);
 
       // Update all the 100 records
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
       newCommitTime = "101";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6439d75..2c46fa9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -73,7 +73,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   public void setUp() throws Exception {
     initSparkContexts("TestCopyOnWriteTable");
     initTempFolderAndPath();
-    initTableType();
+    initMetaClient();
     initTestDataGenerator();
     initFileSystem();
   }
@@ -82,7 +82,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   public void tearDown() throws Exception {
     cleanupSparkContexts();
     cleanupTempFolderAndPath();
-    cleanupTableType();
+    cleanupMetaClient();
     cleanupFileSystem();
     cleanupTestDataGenerator();
   }
@@ -94,7 +94,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     String commitTime = HoodieTestUtils.makeNewCommitTime();
     HoodieWriteConfig config = makeHoodieClientConfig();
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
     Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -127,7 +127,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     // Prepare the AvroParquetIO
     HoodieWriteConfig config = makeHoodieClientConfig();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
     String partitionPath = "/2016/01/31";
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
@@ -203,7 +203,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     Thread.sleep(1000);
     String newCommitTime = HoodieTestUtils.makeNewCommitTime();
-    metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
     List<WriteStatus> statuses =
         jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -271,7 +271,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class)
         .build();
     String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
 
@@ -308,8 +308,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   public void testInsertRecords() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfig();
     String commitTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
     // Case 1:
     // 10 records for partition 1, 1 record for partition 2.
@@ -362,7 +362,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
         HoodieStorageConfig.newBuilder().limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024)
             .build()).build();
     String commitTime = HoodieTestUtils.makeNewCommitTime();
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
 
     List<HoodieRecord> records = new ArrayList<>();
@@ -401,8 +401,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     HoodieClientTestUtils.fakeCommitFile(basePath, "001");
     HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
-
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
 
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
@@ -476,7 +475,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
   public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
         HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
     final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
     String commitTime = "000";
     // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
@@ -487,7 +486,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
 
     WriteStatus writeStatus = ws.get(0).get(0);
     String fileId = writeStatus.getFileId();
-    metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
+    metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
     final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
 
     final List<HoodieRecord> updates =
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 833b39a..788c783 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -156,7 +156,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
       // Verify there are no errors
       assertNoWriteErrors(statuses);
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
       assertTrue(deltaCommit.isPresent());
       assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
@@ -173,7 +173,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       assertTrue(dataFilesToRead.findAny().isPresent());
 
       // verify that there is a commit
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
       assertEquals("Expecting a single commit.", 1,
           timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
@@ -270,7 +270,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
       assertTrue(deltaCommit.isPresent());
       assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
@@ -335,7 +335,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       //rollback a COW commit when TableType is MOR
       client.rollback(newCommitTime);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
       FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
       HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
@@ -454,7 +454,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
             .contains(commitTime2)).collect(Collectors.toList()).size(), 0);
 
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
         roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
         dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
@@ -477,14 +477,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         // Verify there are no errors
         assertNoWriteErrors(statuses);
 
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+        metaClient = HoodieTableMetaClient.reload(metaClient);
 
         String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
         JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
         thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
         roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
         List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList());
@@ -504,7 +504,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         thirdClient.rollback(compactedCommitTime);
 
         allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
         roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
@@ -603,7 +603,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
 
       String compactionInstantTime = "004";
       allCommits.add(compactionInstantTime);
@@ -626,7 +626,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
 
       compactionInstantTime = "006";
       allCommits.add(compactionInstantTime);
@@ -635,7 +635,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       client.commitCompaction(compactionInstantTime, ws, Option.empty());
 
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
 
       final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
@@ -669,7 +669,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // Rollback latest commit first
       client.restoreToInstant("000");
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
       roView = new HoodieTableFileSystemView(metaClient,
           metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
@@ -754,7 +754,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
       // Verify there are no errors
       assertNoWriteErrors(statuses);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
       assertTrue(deltaCommit.isPresent());
       assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
@@ -811,7 +811,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
                 HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords);
 
         // Verify that all data file has one log file
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         table = HoodieTable.getHoodieTable(metaClient, config, jsc);
         // In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
         ((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
@@ -833,7 +833,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
         JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
 
         // Verify that recently written compacted data file has no log file
-        metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+        metaClient = HoodieTableMetaClient.reload(metaClient);
         table = HoodieTable.getHoodieTable(metaClient, config, jsc);
         HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
 
@@ -949,7 +949,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
           .copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath()));
       writeClient.rollback(newCommitTime);
 
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+      metaClient = HoodieTableMetaClient.reload(metaClient);
       HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
       RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e0c30be..7cf095b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -71,6 +71,7 @@ public class HoodieTableMetaClient implements Serializable {
   private String basePath;
   private transient HoodieWrapperFileSystem fs;
   private String metaPath;
+  private boolean loadActiveTimelineOnLoad;
   private SerializableConfiguration hadoopConf;
   private HoodieTableType tableType;
   private HoodieTableConfig tableConfig;
@@ -104,6 +105,7 @@ public class HoodieTableMetaClient implements Serializable {
     this.tableConfig = new HoodieTableConfig(fs, metaPath);
     this.tableType = tableConfig.getTableType();
     log.info("Finished Loading Table of type " + tableType + " from " + basePath);
+    this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
     if (loadActiveTimelineOnLoad) {
       log.info("Loading Active commit timeline for " + basePath);
       getActiveTimeline();
@@ -118,6 +120,14 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTableMetaClient() {
   }
 
+  public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
+    return new HoodieTableMetaClient(
+        oldMetaClient.hadoopConf.get(),
+        oldMetaClient.basePath,
+        oldMetaClient.loadActiveTimelineOnLoad,
+        oldMetaClient.consistencyGuardConfig);
+  }
+
   /**
    * This method is only used when this object is deserialized in a spark executor.
    *