You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/06/08 06:14:56 UTC
[hudi] branch master updated: [HUDI-988] Fix More Unit Test
Flakiness
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e9cab67 [HUDI-988] Fix More Unit Test Flakiness
e9cab67 is described below
commit e9cab67b8095b30205af27498dc0b279d188a454
Author: garyli1019 <ya...@gmail.com>
AuthorDate: Fri Jun 5 17:25:59 2020 -0700
[HUDI-988] Fix More Unit Test Flakiness
---
.../hudi/client/TestCompactionAdminClient.java | 8 --
.../java/org/apache/hudi/client/TestMultiFS.java | 4 +-
.../hudi/client/TestTableSchemaEvolution.java | 12 --
.../hudi/client/TestUpdateSchemaEvolution.java | 3 +-
.../hudi/execution/TestBoundedInMemoryQueue.java | 3 +-
.../TestSparkBoundedInMemoryExecutor.java | 2 +-
.../org/apache/hudi/index/TestHoodieIndex.java | 13 +--
.../hudi/index/bloom/TestHoodieBloomIndex.java | 4 +-
.../index/bloom/TestHoodieGlobalBloomIndex.java | 5 +-
.../apache/hudi/io/TestHoodieCommitArchiveLog.java | 3 +-
.../hudi/io/TestHoodieKeyLocationFetchHandle.java | 4 +-
.../org/apache/hudi/io/TestHoodieMergeHandle.java | 6 +-
.../apache/hudi/table/TestConsistencyGuard.java | 2 +-
.../hudi/table/TestHoodieMergeOnReadTable.java | 123 ++++++++++-----------
.../commit/TestCopyOnWriteActionExecutor.java | 32 +-----
.../table/action/commit/TestUpsertPartitioner.java | 23 +---
.../table/action/compact/TestAsyncCompaction.java | 2 +-
.../table/action/compact/TestHoodieCompactor.java | 5 +-
.../hudi/testutils/HoodieClientTestHarness.java | 67 ++++++++---
.../table/view/HoodieTableFileSystemView.java | 6 +
.../timeline/service/FileSystemViewHandler.java | 2 +-
pom.xml | 2 +-
22 files changed, 138 insertions(+), 193 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 2d69156..1200f67 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -37,7 +37,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -71,13 +70,6 @@ public class TestCompactionAdminClient extends HoodieClientTestBase {
client = new CompactionAdminClient(jsc, basePath);
}
- @AfterEach
- public void tearDown() {
- client.close();
- metaClient = null;
- cleanupSparkContexts();
- }
-
@Test
public void testUnscheduleCompactionPlan() throws Exception {
int numEntriesPerInstant = 10;
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 02efe8e..6a78bc5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -63,9 +63,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupDFS();
- cleanupTestDataGenerator();
+ cleanupResources();
}
protected HoodieWriteConfig getHoodieWriteConfig(String basePath) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 0148bca..25e97c9 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -38,8 +38,6 @@ import org.apache.hudi.testutils.TestRawTripPayload;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -76,16 +74,6 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
public static final String TRIP_EXAMPLE_SCHEMA_DEVOLVED = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA
+ TRIP_SCHEMA_SUFFIX;
- @BeforeEach
- public void setUp() throws IOException {
- initResources();
- }
-
- @AfterEach
- public void tearDown() throws IOException {
- cleanupResources();
- }
-
@Test
public void testSchemaCompatibilityBasic() throws Exception {
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA),
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index d20b9fe..2c985f3 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -61,8 +61,7 @@ public class TestUpdateSchemaEvolution extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws IOException {
- cleanupSparkContexts();
- cleanupFileSystem();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index d80c86d..4b52926 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -72,8 +72,7 @@ public class TestBoundedInMemoryQueue extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupTestDataGenerator();
- cleanupExecutorService();
+ cleanupResources();
}
// Test to ensure that we are reading all records from queue iterator in the same order
diff --git a/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java b/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java
index 2deea67..c55f275 100644
--- a/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/execution/TestSparkBoundedInMemoryExecutor.java
@@ -53,7 +53,7 @@ public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupTestDataGenerator();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
index ea5f851..67451f1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/TestHoodieIndex.java
@@ -19,7 +19,6 @@
package org.apache.hudi.index;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
@@ -85,7 +84,6 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private IndexType indexType;
private HoodieIndex index;
private HoodieWriteConfig config;
- private HoodieWriteClient writeClient;
private String schemaStr;
private Schema schema;
@@ -95,14 +93,10 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
private void setUp(IndexType indexType, boolean initializeIndex) throws Exception {
this.indexType = indexType;
- initSparkContexts("TestHoodieIndex");
- initPath();
- initTestDataGenerator();
- initFileSystem();
+ initResources();
// We have some records to be tagged (two different partitions)
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(schemaStr));
- initMetaClient();
if (initializeIndex) {
instantiateIndex();
}
@@ -110,10 +104,7 @@ public class TestHoodieIndex extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws IOException {
- cleanupSparkContexts();
- cleanupFileSystem();
- cleanupClients();
- cleanupTestDataGenerator();
+ cleanupResources();
}
@ParameterizedTest
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index bfbfa97..97acf03 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -97,9 +97,7 @@ public class TestHoodieBloomIndex extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupFileSystem();
- cleanupClients();
+ cleanupResources();
}
private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 3847047..6aab654 100644
--- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -80,9 +80,8 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness {
}
@AfterEach
- public void tearDown() {
- cleanupSparkContexts();
- cleanupClients();
+ public void tearDown() throws IOException {
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
index 3bee3e8..9cd3b3f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieCommitArchiveLog.java
@@ -69,8 +69,7 @@ public class TestHoodieCommitArchiveLog extends HoodieClientTestHarness {
@AfterEach
public void clean() throws IOException {
- cleanupDFS();
- cleanupSparkContexts();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index af0e728..5c3c5ad 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -82,9 +82,7 @@ public class TestHoodieKeyLocationFetchHandle extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws IOException {
- cleanupSparkContexts();
- cleanupFileSystem();
- cleanupClients();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
index fa6f41a..75acf68 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java
@@ -66,11 +66,7 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupFileSystem();
- cleanupTestDataGenerator();
- cleanupSparkContexts();
- cleanupClients();
- cleanupFileSystem();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
index 5021f5e..2406d85 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestConsistencyGuard.java
@@ -44,7 +44,7 @@ public class TestConsistencyGuard extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupFileSystem();
+ cleanupResources();
}
@Test
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index a26b80b..57c0d8d 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -119,10 +119,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
@AfterEach
public void clean() throws IOException {
- cleanupDFS();
- cleanupSparkContexts();
- cleanupTestDataGenerator();
- cleanupClients();
+ cleanupResources();
}
@Test
@@ -151,9 +148,9 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
client.compact(compactionCommitTime);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
- HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
- HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
// verify that there is a commit
@@ -305,7 +302,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -316,13 +313,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit");
@@ -358,11 +354,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent());
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// Wrote 20 records and deleted 20 records, so remaining 20-20 = 0
assertEquals(0, recordsRead.size(), "Must contain 0 records");
@@ -391,7 +387,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// verify there are no errors
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
Option<HoodieInstant> commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertTrue(commit.isPresent());
assertEquals("001", commit.get().getTimestamp(), "commit should be 001");
@@ -417,11 +413,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- HoodieTableFileSystemView roView =
- new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
final String absentCommit = newCommitTime;
- assertFalse(roView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
+ assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> absentCommit.equals(file.getCommitTime())));
}
}
@@ -446,7 +441,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -457,13 +452,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView =
+ getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit");
@@ -479,7 +474,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);
@@ -493,7 +488,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// After rollback, there should be no parquet file with the failed commit time
assertEquals(0, Arrays.stream(allFiles)
.filter(file -> file.getPath().getName().contains(commitTime1)).count());
- dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size());
}
@@ -509,7 +504,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size());
@@ -529,8 +524,8 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFiles = roView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFiles = tableView.getLatestBaseFiles().map(HoodieBaseFile::getPath).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
// check that the number of records read is still correct after rollback operation
assertEquals(200, recordsRead.size());
@@ -556,20 +551,20 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
thirdClient.rollback(compactedCommitTime);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
- assertFalse(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertFalse(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
}
}
}
@@ -593,7 +588,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -604,13 +599,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"Should list the parquet files we wrote in the delta commit");
@@ -626,7 +620,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
copyOfRecords = dataGen.generateUpdates(newCommitTime, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(newCommitTime, 200));
- List<String> dataFiles = roView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(200, recordsRead.size());
@@ -684,12 +678,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = HoodieTableMetaClient.reload(metaClient);
- roView = new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline(), allFiles);
final String compactedCommitTime =
metaClient.getActiveTimeline().reload().getCommitsTimeline().lastInstant().get().getTimestamp();
- assertTrue(roView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
+ assertTrue(tableView.getLatestBaseFiles().anyMatch(file -> compactedCommitTime.equals(file.getCommitTime())));
/**
* Write 5 (updates)
@@ -711,12 +705,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
- dataFilesToRead = roView.getLatestBaseFiles();
+ tableView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ dataFilesToRead = tableView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- SliceView rtView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ SliceView rtView = getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
List<HoodieFileGroup> fileGroups =
((HoodieTableFileSystemView) rtView).getAllFileGroups().collect(Collectors.toList());
assertTrue(fileGroups.isEmpty());
@@ -756,7 +748,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -767,13 +759,13 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- BaseFileOnlyView roView = new HoodieTableFileSystemView(metaClient,
+ BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
Map<String, Long> parquetFileIdToSize =
dataFilesToRead.collect(Collectors.toMap(HoodieBaseFile::getFileId, HoodieBaseFile::getFileSize));
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> dataFilesList = dataFilesToRead.collect(Collectors.toList());
assertTrue(dataFilesList.size() > 0,
@@ -801,7 +793,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
assertFalse(commit.isPresent());
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
- roView = new HoodieTableFileSystemView(metaClient,
+ roView = getHoodieTableFileSystemView(metaClient,
hoodieTable.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
List<HoodieBaseFile> newDataFilesList = dataFilesToRead.collect(Collectors.toList());
@@ -830,7 +822,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.insert(recordsRDD, newCommitTime).collect();
// Update all the 100 records
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
+ metaClient = getHoodieMetaClient(hadoopConf, basePath);
newCommitTime = "101";
writeClient.startCommitWithTime(newCommitTime);
@@ -905,7 +897,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commit(newCommitTime, statuses);
HoodieTable table =
- HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
+ HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -966,7 +958,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
+ metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieInstant last = metaClient.getCommitsTimeline().getInstants()
.filter(instant -> instant.getTimestamp().equals(lastCommitTime)).findFirst().get();
String fileName = last.getFileName();
@@ -1015,7 +1007,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
statuses.collect();
HoodieTable table =
- HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
+ HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
SliceView tableRTFileSystemView = table.getSliceView();
long numLogFiles = 0;
@@ -1036,7 +1028,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
writeClient.commitCompaction(newCommitTime, statuses, Option.empty());
// Trigger a rollback of compaction
writeClient.rollback(newCommitTime);
- table = HoodieTable.create(new HoodieTableMetaClient(hadoopConf, basePath), config, hadoopConf);
+ table = HoodieTable.create(getHoodieMetaClient(hadoopConf, basePath), config, hadoopConf);
tableRTFileSystemView = table.getSliceView();
((SyncableFileSystemView) tableRTFileSystemView).reset();
Option<HoodieInstant> lastInstant = ((SyncableFileSystemView) tableRTFileSystemView).getLastInstant();
@@ -1056,7 +1048,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
+ metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
// Create a commit without rolling stats in metadata to test backwards compatibility
@@ -1155,7 +1147,6 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
try (HoodieWriteClient client = getHoodieWriteClient(cfg);) {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
@@ -1302,7 +1293,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
assertNoWriteErrors(statuses);
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieMergeOnReadTable hoodieTable = (HoodieMergeOnReadTable) HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
@@ -1314,11 +1305,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertFalse(dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit");
@@ -1398,7 +1389,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
List<WriteStatus> statuses = client.insert(writeRecords, commitTime).collect();
assertNoWriteErrors(statuses);
- metaClient = new HoodieTableMetaClient(hadoopConf, cfg.getBasePath());
+ metaClient = getHoodieMetaClient(hadoopConf, cfg.getBasePath());
HoodieTable hoodieTable = HoodieTable.create(metaClient, cfg, hadoopConf);
Option<HoodieInstant> deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
@@ -1410,11 +1401,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
BaseFileOnlyView roView =
- new HoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
+ getHoodieTableFileSystemView(metaClient, metaClient.getCommitTimeline().filterCompletedInstants(), allFiles);
Stream<HoodieBaseFile> dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(!dataFilesToRead.findAny().isPresent());
- roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ roView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
dataFilesToRead = roView.getLatestBaseFiles();
assertTrue(dataFilesToRead.findAny().isPresent(),
"should list the parquet files we wrote in the delta commit");
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 25b64ae..63a04ec 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -37,7 +37,7 @@ import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.io.HoodieCreateHandle;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.TestRawTripPayload;
import org.apache.hudi.testutils.TestRawTripPayload.MetadataMergeWriteStatus;
@@ -52,8 +52,6 @@ import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.spark.TaskContext;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
@@ -69,27 +67,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
+public class TestCopyOnWriteActionExecutor extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestCopyOnWriteActionExecutor.class);
- @BeforeEach
- public void setUp() throws Exception {
- initSparkContexts("TestCopyOnWriteActionExecutor");
- initPath();
- initMetaClient();
- initTestDataGenerator();
- initFileSystem();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupClients();
- cleanupFileSystem();
- cleanupTestDataGenerator();
- }
-
@Test
public void testMakeNewPath() throws Exception {
String fileName = UUID.randomUUID().toString();
@@ -173,7 +154,7 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
GenericRecord newRecord;
int index = 0;
for (GenericRecord record : fileRecords) {
- System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
+ //System.out.println("Got :" + record.get("_row_key").toString() + ", Exp :" + records.get(index).getRecordKey());
assertEquals(records.get(index).getRecordKey(), record.get("_row_key").toString());
index++;
}
@@ -427,11 +408,4 @@ public class TestCopyOnWriteActionExecutor extends HoodieClientTestHarness {
}).map(x -> (List<WriteStatus>) HoodieClientTestUtils.collectStatuses(x)).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
}
-
- @AfterEach
- public void cleanup() {
- if (jsc != null) {
- jsc.stop();
- }
- }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 7f15379..0926a37 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -29,14 +29,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
-import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieTestDataGenerator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -46,27 +44,10 @@ import scala.Tuple2;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestUpsertPartitioner extends HoodieClientTestHarness {
+public class TestUpsertPartitioner extends HoodieClientTestBase {
private static final Logger LOG = LogManager.getLogger(TestUpsertPartitioner.class);
- @BeforeEach
- public void setUp() throws Exception {
- initSparkContexts("TestUpsertPartitioner");
- initPath();
- initMetaClient();
- initTestDataGenerator();
- initFileSystem();
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- cleanupSparkContexts();
- cleanupClients();
- cleanupFileSystem();
- cleanupTestDataGenerator();
- }
-
private UpsertPartitioner getUpsertPartitioner(int smallFileSize, int numInserts, int numUpdates, int fileSize,
String testPartitionPath, boolean autoSplitInserts) throws Exception {
HoodieWriteConfig config = makeHoodieClientConfigBuilder()
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 0178dba..553be74 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -523,7 +523,7 @@ public class TestAsyncCompaction extends HoodieClientTestBase {
private List<HoodieBaseFile> getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException {
FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath());
HoodieTableFileSystemView view =
- new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
+ getHoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles);
return view.getLatestBaseFiles().collect(Collectors.toList());
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 67860d7..7fa64a5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -78,10 +78,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
@AfterEach
public void tearDown() throws Exception {
- cleanupFileSystem();
- cleanupTestDataGenerator();
- cleanupSparkContexts();
- cleanupClients();
+ cleanupResources();
}
private HoodieWriteConfig getConfig() {
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 70bd591..69e1776 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -22,11 +22,14 @@ import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,6 +37,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.HoodieTable;
+
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.slf4j.Logger;
@@ -60,7 +65,10 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
protected transient ExecutorService executorService;
protected transient HoodieTableMetaClient metaClient;
private static AtomicInteger instantGen = new AtomicInteger(1);
- protected transient HoodieWriteClient client;
+ protected transient HoodieWriteClient writeClient;
+ protected transient HoodieReadClient readClient;
+ protected transient HoodieTableFileSystemView tableView;
+ protected transient HoodieTable hoodieTable;
protected final SparkTaskContextSupplier supplier = new SparkTaskContextSupplier();
@@ -93,6 +101,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
cleanupSparkContexts();
cleanupTestDataGenerator();
cleanupFileSystem();
+ cleanupDFS();
+ cleanupExecutorService();
+ System.gc();
}
/**
@@ -163,6 +174,7 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
if (fs != null) {
LOG.warn("Closing file-system instance used in previous test-run");
fs.close();
+ fs = null;
}
}
@@ -185,13 +197,22 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
/**
- * Cleanups table type.
+ * Cleanups hoodie clients.
*/
- protected void cleanupClients() {
- metaClient = null;
- if (null != client) {
- client.close();
- client = null;
+ protected void cleanupClients() throws IOException {
+ if (metaClient != null) {
+ metaClient = null;
+ }
+ if (readClient != null) {
+ readClient = null;
+ }
+ if (writeClient != null) {
+ writeClient.close();
+ writeClient = null;
+ }
+ if (tableView != null) {
+ tableView.close();
+ tableView = null;
}
}
@@ -208,7 +229,9 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
*
*/
protected void cleanupTestDataGenerator() {
- dataGen = null;
+ if (dataGen != null) {
+ dataGen = null;
+ }
}
/**
@@ -288,16 +311,32 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
}
public HoodieReadClient getHoodieReadClient(String basePath) {
- return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+ readClient = new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+ return readClient;
}
public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
HoodieIndex index) {
- if (null != client) {
- client.close();
- client = null;
+ if (null != writeClient) {
+ writeClient.close();
+ writeClient = null;
+ }
+ writeClient = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+ return writeClient;
+ }
+
+ public HoodieTableMetaClient getHoodieMetaClient(Configuration conf, String basePath) {
+ metaClient = new HoodieTableMetaClient(conf, basePath);
+ return metaClient;
+ }
+
+ public HoodieTableFileSystemView getHoodieTableFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+ FileStatus[] fileStatuses) {
+ if (tableView == null) {
+ tableView = new HoodieTableFileSystemView(metaClient, visibleActiveTimeline, fileStatuses);
+ } else {
+ tableView.init(metaClient, visibleActiveTimeline, fileStatuses);
}
- client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
- return client;
+ return tableView;
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index 4d877fa..56ae22d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -89,6 +89,12 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
super.init(metaClient, visibleActiveTimeline);
}
+ public void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
+ FileStatus[] fileStatuses) {
+ init(metaClient, visibleActiveTimeline);
+ addFilesToView(fileStatuses);
+ }
+
@Override
protected void resetViewState() {
this.fgIdToPendingCompaction = null;
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
index 22c4cf9..683eb06 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/FileSystemViewHandler.java
@@ -117,7 +117,7 @@ public class FileSystemViewHandler {
synchronized (view) {
if (isLocalViewBehind(ctx)) {
HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline();
- LOG.warn("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+ LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient
+ " as last known instant but server has the folling timeline :"
+ localTimeline.getInstants().collect(Collectors.toList()));
view.sync();
diff --git a/pom.xml b/pom.xml
index e218cff..cc39f41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -246,7 +246,7 @@
<version>${maven-surefire-plugin.version}</version>
<configuration>
<skip>${skipUTs}</skip>
- <argLine>-Xmx4g</argLine>
+ <argLine>-Xmx2g</argLine>
<forkedProcessExitTimeoutInSeconds>120</forkedProcessExitTimeoutInSeconds>
<systemPropertyVariables>
<log4j.configuration>