You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/09/30 12:38:57 UTC
[incubator-hudi] branch master updated: [HUDI-247] Unify the
re-initialization of HoodieTableMetaClient in test for hoodie-client module
(#930)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 01e803b [HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)
01e803b is described below
commit 01e803b00e5acf9bb1dbe38340e38e8feb8d037d
Author: vinoyang <ya...@gmail.com>
AuthorDate: Mon Sep 30 20:38:52 2019 +0800
[HUDI-247] Unify the re-initialization of HoodieTableMetaClient in test for hoodie-client module (#930)
---
.../org/apache/hudi/HoodieClientTestHarness.java | 13 ++--
.../src/test/java/org/apache/hudi/TestCleaner.java | 76 ++++++++++------------
.../java/org/apache/hudi/TestClientRollback.java | 6 +-
.../hudi/index/TestHBaseQPSResourceAllocator.java | 4 +-
.../java/org/apache/hudi/index/TestHbaseIndex.java | 19 +++---
.../org/apache/hudi/index/TestHoodieIndex.java | 4 +-
.../hudi/index/bloom/TestHoodieBloomIndex.java | 34 +++++-----
.../index/bloom/TestHoodieGlobalBloomIndex.java | 14 ++--
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 21 +++---
.../org/apache/hudi/io/TestHoodieCompactor.java | 15 ++---
.../org/apache/hudi/io/TestHoodieMergeHandle.java | 12 ++--
.../apache/hudi/table/TestCopyOnWriteTable.java | 23 ++++---
.../apache/hudi/table/TestMergeOnReadTable.java | 32 ++++-----
.../hudi/common/table/HoodieTableMetaClient.java | 10 +++
14 files changed, 142 insertions(+), 141 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
index 84a9e21..10fb0bc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/HoodieClientTestHarness.java
@@ -55,6 +55,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
protected TemporaryFolder folder = null;
protected transient HoodieTestDataGenerator dataGen = null;
protected transient ExecutorService executorService;
+ protected transient HoodieTableMetaClient metaClient;
//dfs
protected String dfsBasePath;
@@ -72,7 +73,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
initSparkContexts();
initTestDataGenerator();
initFileSystem();
- initTableType();
+ initMetaClient();
}
/**
@@ -80,7 +81,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
* @throws IOException
*/
public void cleanupResources() throws IOException {
- cleanupTableType();
+ cleanupMetaClient();
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
@@ -191,7 +192,7 @@ public abstract class HoodieClientTestHarness implements Serializable {
*
* @throws IOException
*/
- protected void initTableType() throws IOException {
+ protected void initMetaClient() throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
@@ -200,14 +201,14 @@ public abstract class HoodieClientTestHarness implements Serializable {
throw new IllegalStateException("The Spark context has not been initialized.");
}
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+ metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
}
/**
* Cleanups table type.
*/
- protected void cleanupTableType() {
-
+ protected void cleanupMetaClient() {
+ metaClient = null;
}
/**
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
index 1e8df4a..fe25491 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestCleaner.java
@@ -114,7 +114,7 @@ public class TestCleaner extends TestHoodieClientBase {
assertNoWriteErrors(statuses);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1, timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
// Should have 100 records in table (check using Index), all in locations marked at commit
@@ -200,8 +200,8 @@ public class TestCleaner extends TestHoodieClientBase {
insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
TableFileSystemView fsView = table.getFileSystemView();
Option<Boolean> added = Option.fromJavaOptional(fsView.getAllFileGroups(partitionPath).findFirst()
@@ -239,8 +239,8 @@ public class TestCleaner extends TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = HoodieTable.getHoodieTable(metadata, getConfig(), jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
HoodieTimeline timeline = table.getMetaClient().getCommitsTimeline();
TableFileSystemView fsView = table.getFileSystemView();
@@ -375,8 +375,8 @@ public class TestCleaner extends TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table1 = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1);
Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
@@ -424,9 +424,8 @@ public class TestCleaner extends TestHoodieClientBase {
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "000");
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
@@ -442,8 +441,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
- table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -472,8 +471,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
- table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
@@ -578,9 +577,8 @@ public class TestCleaner extends TestHoodieClientBase {
String file1P1C0 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertEquals("Must not clean any files", 0,
@@ -596,8 +594,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 1 insert & 1 update per partition
HoodieTestUtils.createCommitFiles(basePath, "001");
- table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String file2P0C1 = HoodieTestUtils
.createNewDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "001"); // insert
@@ -626,8 +624,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "002");
- table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "002", file1P0C0); // update
@@ -646,8 +644,8 @@ public class TestCleaner extends TestHoodieClientBase {
// make next commit, with 2 updates to existing files, and 1 insert
HoodieTestUtils.createCommitFiles(basePath, "003");
- table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true),
- config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieTestUtils
.createDataFile(basePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "003", file1P0C0); // update
@@ -700,9 +698,8 @@ public class TestCleaner extends TestHoodieClientBase {
assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
table.rollback(jsc, "000", true);
assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
@@ -722,9 +719,8 @@ public class TestCleaner extends TestHoodieClientBase {
// with just some commit metadata, but no data/partitionPaths.
HoodieTestUtils.createCommitFiles(basePath, "000");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStatsOne = table.clean(jsc);
assertTrue("HoodieCleanStats should be empty for a table with empty partitionPaths", hoodieCleanStatsOne.isEmpty());
@@ -783,9 +779,8 @@ public class TestCleaner extends TestHoodieClientBase {
updateAllFilesInPartition(filesP1C0, HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH, "003");
updateAllFilesInPartition(filesP2C0, HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH, "003");
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
assertEquals(100,
@@ -890,9 +885,8 @@ public class TestCleaner extends TestHoodieClientBase {
for (int j = 1; j <= i; j++) {
if (j == i && j <= maxNumFileIdsForCompaction) {
expFileIdToPendingCompaction.put(fileId, compactionInstants[j]);
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
FileSlice slice = table.getRTFileSystemView().getLatestFileSlices(
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
.filter(fs -> fs.getFileId().equals(fileId)).findFirst().get();
@@ -934,15 +928,13 @@ public class TestCleaner extends TestHoodieClientBase {
}
// Clean now
- HoodieTable table = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<HoodieCleanStat> hoodieCleanStats = table.clean(jsc);
// Test for safety
- final HoodieTable hoodieTable = HoodieTable.getHoodieTable(
- new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
- jsc);
+ final HoodieTableMetaClient newMetaClient = HoodieTableMetaClient.reload(metaClient);
+ final HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
expFileIdToPendingCompaction.entrySet().stream().forEach(entry -> {
String fileId = entry.getKey();
@@ -961,7 +953,7 @@ public class TestCleaner extends TestHoodieClientBase {
// Test for progress (Did we clean some files ?)
long numFilesUnderCompactionDeleted =
hoodieCleanStats.stream().flatMap(cleanStat -> {
- return convertPathToFileIdWithCommitTime(metaClient, cleanStat.getDeletePathPatterns()).map(
+ return convertPathToFileIdWithCommitTime(newMetaClient, cleanStat.getDeletePathPatterns()).map(
fileIdWithCommitTime -> {
if (expFileIdToPendingCompaction.containsKey(fileIdWithCommitTime.getKey())) {
Assert.assertTrue("Deleted instant time must be less than pending compaction",
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
index 20cc86c..11504a4 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestClientRollback.java
@@ -97,7 +97,7 @@ public class TestClientRollback extends TestHoodieClientBase {
assertNoWriteErrors(statuses);
List<String> partitionPaths = FSUtils.getAllPartitionPaths(fs, cfg.getBasePath(),
getConfig().shouldAssumeDatePartitioning());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view1 = table.getROFileSystemView();
@@ -122,7 +122,7 @@ public class TestClientRollback extends TestHoodieClientBase {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view2 = table.getROFileSystemView();
@@ -143,7 +143,7 @@ public class TestClientRollback extends TestHoodieClientBase {
HoodieInstant savepoint = table.getCompletedSavepointTimeline().getInstants().findFirst().get();
client.rollbackToSavepoint(savepoint.getTimestamp());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
final ReadOptimizedView view3 = table.getROFileSystemView();
dataFiles = partitionPaths.stream().flatMap(s -> {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
index 622395d..9efe708 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHBaseQPSResourceAllocator.java
@@ -53,14 +53,14 @@ public class TestHBaseQPSResourceAllocator extends HoodieClientTestHarness {
initTempFolderAndPath();
basePath = folder.getRoot().getAbsolutePath() + QPS_TEST_SUFFIX_PATH;
// Initialize table
- initTableType();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
if (utility != null) {
utility.shutdownMiniCluster();
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
index 9d89186..6c2fc0f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHbaseIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.HoodieWriteClient;
import org.apache.hudi.WriteStatus;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -104,9 +103,8 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
// Create a temp folder as the base path
initTempFolderAndPath();
- // Initialize table
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
initTestDataGenerator();
+ initMetaClient();
}
@After
@@ -114,6 +112,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
cleanupSparkContexts();
cleanupTempFolderAndPath();
cleanupTestDataGenerator();
+ cleanupMetaClient();
}
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -132,7 +131,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
HBaseIndex index = new HBaseIndex(config);
try (HoodieWriteClient writeClient = getWriteClient(config);) {
writeClient.startCommit();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Test tagLocation without any entries in index
@@ -151,7 +150,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 200);
@@ -173,7 +172,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
HBaseIndex index = new HBaseIndex(config);
HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config);
writeClient.startCommit();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(writeRecords, newCommitTime);
@@ -185,7 +184,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
// Now commit this & update location of records inserted and validate no errors
writeClient.commit(newCommitTime, writeStatues);
// Now tagLocation for these records, hbaseIndex should tag them correctly
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
JavaRDD<HoodieRecord> javaRDD = index.tagLocation(writeRecords, jsc, hoodieTable);
assertTrue(javaRDD.filter(record -> record.isCurrentLocationKnown()).collect().size() == 10);
@@ -205,7 +204,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Insert 200 records
@@ -257,7 +256,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Insert 250 records
@@ -282,7 +281,7 @@ public class TestHbaseIndex extends HoodieClientTestHarness {
String newCommitTime = writeClient.startCommit();
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 250);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Insert 200 records
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index d073757..4354216 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -36,14 +36,14 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestHoodieIndex");
initTempFolderAndPath();
- initTableType();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 28d19eb..69d0cfa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -44,7 +44,6 @@ import org.apache.hudi.common.HoodieClientTestUtils;
import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -92,10 +91,10 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
initSparkContexts("TestHoodieBloomIndex");
initTempFolderAndPath();
initFileSystem();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
+ initMetaClient();
}
@After
@@ -103,6 +102,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
cleanupSparkContexts();
cleanupFileSystem();
cleanupTempFolderAndPath();
+ cleanupMetaClient();
}
private HoodieWriteConfig makeConfig() {
@@ -163,8 +163,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
false);
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01", "2015/03/12");
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
// Still 0, as no valid commit
assertEquals(filesList.size(), 0);
@@ -174,7 +174,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
assertEquals(filesList.size(), 4);
@@ -286,9 +286,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// We have some records to be tagged (two different partitions)
JavaRDD<HoodieRecord> recordRDD = jsc.emptyRDD();
// Also create the metadata and config
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -331,9 +331,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2, record3, record4));
// Also create the metadata and config
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -353,8 +353,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
// We do the tag again
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
@@ -401,9 +401,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
JavaRDD<HoodieKey> keysRDD = jsc.parallelize(Arrays.asList(key1, key2, key3, key4));
// Also create the metadata and config
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Let's tag
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
@@ -424,8 +424,8 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/01/31", Arrays.asList(record4), schema, null, true);
// We do the tag again
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
// Check results
@@ -473,9 +473,9 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
// We do the tag
JavaRDD<HoodieRecord> recordRDD = jsc.parallelize(Arrays.asList(record1, record2));
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = makeConfig();
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
JavaRDD<HoodieRecord> taggedRecordRDD = bloomIndex.tagLocation(recordRDD, jsc, table);
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 9993cb4..11669a3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -40,7 +40,6 @@ import org.apache.hudi.common.TestRawTripPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.FileIOUtils;
@@ -66,16 +65,17 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestHoodieGlobalBloomIndex");
initTempFolderAndPath();
- HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
// We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
+ cleanupMetaClient();
}
@Test
@@ -127,8 +127,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
List<String> partitions = Arrays.asList("2016/01/21", "2016/04/01");
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// partitions will NOT be respected by this loadInvolvedFiles(...) call
List<Tuple2<String, BloomIndexFileInfo>> filesList = index.loadInvolvedFiles(partitions, jsc, table);
// Still 0, as no valid commit
@@ -139,7 +139,7 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
new File(basePath + "/.hoodie/20160401010101.commit").createNewFile();
new File(basePath + "/.hoodie/20150312101010.commit").createNewFile();
- table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ table = HoodieTable.getHoodieTable(metaClient, config, jsc);
filesList = index.loadInvolvedFiles(partitions, jsc, table);
assertEquals(filesList.size(), 4);
@@ -264,8 +264,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
HoodieClientTestUtils.writeParquetFile(basePath, "2015/03/12", Arrays.asList(record4), schema, null, false);
// intentionally missed the partition "2015/03/12" to see if the GlobalBloomIndex can pick it up
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTable table = HoodieTable.getHoodieTable(metadata, config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// Add some commits
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 5dee6f6..9a9ddc0 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -54,6 +54,7 @@ import org.junit.Test;
public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
private Configuration hadoopConf;
+ private HoodieTableMetaClient metaClient;
@Before
public void init() throws Exception {
@@ -63,7 +64,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
hadoopConf = dfs.getConf();
jsc.hadoopConfiguration().addResource(dfs.getConf());
dfs.mkdirs(new Path(basePath));
- HoodieTestUtils.init(hadoopConf, basePath);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath);
}
@After
@@ -78,8 +79,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").build();
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
- new HoodieTableMetaClient(dfs.getConf(), cfg.getBasePath(), true));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
}
@@ -135,7 +136,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "105"), dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "105", dfs.getConf());
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals("Loaded 6 commits and the count should match", 6, timeline.countInstants());
@@ -158,8 +159,8 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
// verify in-flight instants before archive
verifyInflightInstants(metaClient, 3);
- HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg,
- new HoodieTableMetaClient(dfs.getConf(), basePath, true));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
assertTrue(archiveLog.archiveIfRequired(jsc));
@@ -235,7 +236,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
// Requested Compaction
HoodieTestDataGenerator.createCompactionAuxiliaryMetadata(basePath,
@@ -302,7 +303,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -328,7 +329,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCommitFile(basePath, "101", dfs.getConf());
@@ -360,7 +361,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable("test-trip-table").withCompactionConfig(
HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 5).build()).build();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(dfs.getConf(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(cfg, metaClient);
HoodieTestDataGenerator.createCommitFile(basePath, "100", dfs.getConf());
HoodieTestDataGenerator.createCompactionRequestedFile(basePath, "101", dfs.getConf());
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
index 8d9a676..0a02b70 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCompactor.java
@@ -52,6 +52,7 @@ import org.junit.Test;
public class TestHoodieCompactor extends HoodieClientTestHarness {
private Configuration hadoopConf;
+ private HoodieTableMetaClient metaClient;
@Before
public void setUp() throws Exception {
@@ -62,7 +63,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
initTempFolderAndPath();
hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
fs = FSUtils.getFs(basePath, hadoopConf);
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ);
initTestDataGenerator();
}
@@ -96,9 +97,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@Test(expected = HoodieNotSupportedException.class)
public void testCompactionOnCopyOnWriteFail() throws Exception {
- HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-
+ metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.COPY_ON_WRITE);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
table.compact(jsc, compactionInstantTime, table.scheduleCompaction(jsc, compactionInstantTime));
@@ -106,8 +105,8 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@Test
public void testCompactionEmpty() throws Exception {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieWriteConfig config = getConfig();
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
try (HoodieWriteClient writeClient = getWriteClient(config);) {
@@ -136,7 +135,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
newCommitTime = "101";
@@ -153,7 +152,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
updatedRecords);
// Verify that all data file has one log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
for (String partitionPath : dataGen.getPartitionPaths()) {
List<FileSlice> groupedLogFiles = table.getRTFileSystemView().getLatestFileSlices(partitionPath)
@@ -164,7 +163,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
}
// Do a compaction
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
String compactionInstantTime = HoodieActiveTimeline.createNewCommitTime();
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index 288e04c..68db6ca 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -58,17 +58,17 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
initSparkContexts("TestHoodieMergeHandle");
initTempFolderAndPath();
initFileSystem();
- initTableType();
initTestDataGenerator();
+ initMetaClient();
}
@After
public void tearDown() throws Exception {
- cleanupTableType();
cleanupFileSystem();
cleanupTestDataGenerator();
cleanupTempFolderAndPath();
cleanupSparkContexts();
+ cleanupMetaClient();
}
private HoodieWriteClient getWriteClient(HoodieWriteConfig config) throws Exception {
@@ -109,7 +109,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
@@ -137,7 +137,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there are 2 commits
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting two commits.", 2, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
.countInstants());
@@ -161,7 +161,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// verify that there are now 3 commits
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
assertEquals("Expecting three commits.", 3, timeline.findInstantsAfter("000", Integer.MAX_VALUE)
.countInstants());
@@ -259,7 +259,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
.map(status -> status.getStat().getNumInserts()).reduce((a, b) -> a + b).get(), 100);
// Update all the 100 records
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
newCommitTime = "101";
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6439d75..2c46fa9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -73,7 +73,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
public void setUp() throws Exception {
initSparkContexts("TestCopyOnWriteTable");
initTempFolderAndPath();
- initTableType();
+ initMetaClient();
initTestDataGenerator();
initFileSystem();
}
@@ -82,7 +82,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
public void tearDown() throws Exception {
cleanupSparkContexts();
cleanupTempFolderAndPath();
- cleanupTableType();
+ cleanupMetaClient();
cleanupFileSystem();
cleanupTestDataGenerator();
}
@@ -94,7 +94,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
String commitTime = HoodieTestUtils.makeNewCommitTime();
HoodieWriteConfig config = makeHoodieClientConfig();
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -127,7 +127,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfig();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String partitionPath = "/2016/01/31";
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
@@ -203,7 +203,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
Thread.sleep(1000);
String newCommitTime = HoodieTestUtils.makeNewCommitTime();
- metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
List<WriteStatus> statuses =
jsc.parallelize(Arrays.asList(1)).map(x -> {
@@ -271,7 +271,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withWriteStatusClass(MetadataMergeWriteStatus.class)
.build();
String firstCommitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
@@ -308,8 +308,8 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
public void testInsertRecords() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfig();
String commitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
// Case 1:
// 10 records for partition 1, 1 record for partition 2.
@@ -362,7 +362,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
HoodieStorageConfig.newBuilder().limitFileSize(64 * 1024).parquetBlockSize(64 * 1024).parquetPageSize(64 * 1024)
.build()).build();
String commitTime = HoodieTestUtils.makeNewCommitTime();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
List<HoodieRecord> records = new ArrayList<>();
@@ -401,8 +401,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
HoodieClientTestUtils.fakeCommitFile(basePath, "001");
HoodieClientTestUtils.fakeDataFile(basePath, testPartitionPath, "001", "file1", fileSize);
-
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{testPartitionPath});
@@ -476,7 +475,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
- HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
String commitTime = "000";
// Perform inserts of 100 records to test CreateHandle and BufferedExecutor
@@ -487,7 +486,7 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
- metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
+ metaClient.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
final List<HoodieRecord> updates =
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 833b39a..788c783 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -156,7 +156,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
@@ -173,7 +173,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath(), true);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimeline timeline = metaClient.getCommitTimeline().filterCompletedInstants();
assertEquals("Expecting a single commit.", 1,
timeline.findInstantsAfter("000", Integer.MAX_VALUE).countInstants());
@@ -270,7 +270,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 004", "004", deltaCommit.get().getTimestamp());
@@ -335,7 +335,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
//rollback a COW commit when TableType is MOR
client.rollback(newCommitTime);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient,
@@ -454,7 +454,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
.contains(commitTime2)).collect(Collectors.toList()).size(), 0);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
@@ -477,14 +477,14 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = thirdClient.scheduleCompaction(Option.empty()).get().toString();
JavaRDD<WriteStatus> ws = thirdClient.compact(compactionInstantTime);
thirdClient.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
List<HoodieDataFile> dataFiles2 = roView.getLatestDataFiles().collect(Collectors.toList());
@@ -504,7 +504,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
@@ -603,7 +603,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
String compactionInstantTime = "004";
allCommits.add(compactionInstantTime);
@@ -626,7 +626,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
compactionInstantTime = "006";
allCommits.add(compactionInstantTime);
@@ -635,7 +635,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
client.commitCompaction(compactionInstantTime, ws, Option.empty());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime = metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant()
@@ -669,7 +669,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Rollback latest commit first
client.restoreToInstant("000");
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
roView = new HoodieTableFileSystemView(metaClient,
metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
@@ -754,7 +754,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
// Verify there are no errors
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
+ metaClient = HoodieTableMetaClient.reload(metaClient);
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());
@@ -811,7 +811,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
HoodieTestDataGenerator.avroSchemaWithMetadataFields, updatedRecords);
// Verify that all data file has one log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
// In writeRecordsToLogFiles, no commit files are getting added, so resetting file-system view state
((SyncableFileSystemView) (table.getRTFileSystemView())).reset();
@@ -833,7 +833,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
JavaRDD<WriteStatus> result = writeClient.compact(compactionInstantTime);
// Verify that recently written compacted data file has no log file
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
table = HoodieTable.getHoodieTable(metaClient, config, jsc);
HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
@@ -949,7 +949,7 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
.copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime);
- metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e0c30be..7cf095b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -71,6 +71,7 @@ public class HoodieTableMetaClient implements Serializable {
private String basePath;
private transient HoodieWrapperFileSystem fs;
private String metaPath;
+ private boolean loadActiveTimelineOnLoad;
private SerializableConfiguration hadoopConf;
private HoodieTableType tableType;
private HoodieTableConfig tableConfig;
@@ -104,6 +105,7 @@ public class HoodieTableMetaClient implements Serializable {
this.tableConfig = new HoodieTableConfig(fs, metaPath);
this.tableType = tableConfig.getTableType();
log.info("Finished Loading Table of type " + tableType + " from " + basePath);
+ this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
if (loadActiveTimelineOnLoad) {
log.info("Loading Active commit timeline for " + basePath);
getActiveTimeline();
@@ -118,6 +120,14 @@ public class HoodieTableMetaClient implements Serializable {
public HoodieTableMetaClient() {
}
+ public static HoodieTableMetaClient reload(HoodieTableMetaClient oldMetaClient) {
+ return new HoodieTableMetaClient(
+ oldMetaClient.hadoopConf.get(),
+ oldMetaClient.basePath,
+ oldMetaClient.loadActiveTimelineOnLoad,
+ oldMetaClient.consistencyGuardConfig);
+ }
+
/**
* This method is only used when this object is deserialized in a spark executor.
*