You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2020/06/08 12:52:56 UTC
[hudi] 03/04: Making few fixes after cherry picking
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d3afcbac3a6e5362d57570a2a5807abbf65c69d8
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sun Jun 7 16:23:40 2020 -0400
Making few fixes after cherry picking
---
.../apache/hudi/client/TestHoodieClientBase.java | 917 +++++++++++----------
.../hudi/common/HoodieClientTestHarness.java | 426 +++++-----
.../apache/hudi/table/TestMergeOnReadTable.java | 2 +
.../hudi/table/compact/TestHoodieCompactor.java | 6 +-
.../table/string/TestHoodieActiveTimeline.java | 2 +-
5 files changed, 678 insertions(+), 675 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
index 6e6458b..6856489 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientBase.java
@@ -72,477 +72,478 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHoodieClientBase extends HoodieClientTestHarness {
- private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
-
- @Before
- public void setUp() throws Exception {
- initResources();
+ private static final Logger LOG = LogManager.getLogger(TestHoodieClientBase.class);
+
+ @Before
+ public void setUp() throws Exception {
+ initResources();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cleanupResources();
+ }
+
+ protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
+ return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+ }
+
+ /**
+ * Get Default HoodieWriteConfig for tests.
+ *
+ * @return Default Hoodie Write Config for tests
+ */
+ protected HoodieWriteConfig getConfig() {
+ return getConfigBuilder().build();
+ }
+
+ protected HoodieWriteConfig getConfig(IndexType indexType) {
+ return getConfigBuilder(indexType).build();
+ }
+
+ /**
+ * Get Config builder with default configs set.
+ *
+ * @return Config Builder
+ */
+ protected HoodieWriteConfig.Builder getConfigBuilder() {
+ return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ }
+
+ /**
+ * Get Config builder with default configs set.
+ *
+ * @return Config Builder
+ */
+ HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
+ return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+ }
+
+ HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+ return getConfigBuilder(schemaStr, IndexType.BLOOM);
+ }
+
+ /**
+ * Get Config builder with default configs set.
+ *
+ * @return Config Builder
+ */
+ HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
+ return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
+ .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
+ .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
+ .withWriteStatusClass(MetadataMergeWriteStatus.class)
+ .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
+ .forTable("test-trip-table")
+ .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
+ .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
+ .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
+ }
+
+ protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
+ ((SyncableFileSystemView) (table.getSliceView())).reset();
+ return table;
+ }
+
+ /**
+ * Assert no failures in writing hoodie files.
+ *
+ * @param statuses List of Write Status
+ */
+ public static void assertNoWriteErrors(List<WriteStatus> statuses) {
+ // Verify there are no errors
+ for (WriteStatus status : statuses) {
+ assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
}
-
- @After
- public void tearDown() throws Exception {
- cleanupResources();
+ }
+
+ void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+ Set<String> partitionPathSet = inputRecords.stream()
+ .map(HoodieRecord::getPartitionPath)
+ .collect(Collectors.toSet());
+ assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+ }
+
+ void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+ Set<String> partitionPathSet = inputKeys.stream()
+ .map(HoodieKey::getPartitionPath)
+ .collect(Collectors.toSet());
+ assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+ }
+
+ /**
+ * Ensure presence of partition meta-data at known depth.
+ *
+ * @param partitionPaths Partition paths to check
+ * @param fs File System
+ * @throws IOException in case of error
+ */
+ void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+ for (String partitionPath : partitionPaths) {
+ assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
+ HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
+ pmeta.readFromFS();
+ Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
}
-
- protected HoodieCleanClient getHoodieCleanClient(HoodieWriteConfig cfg) {
- return new HoodieCleanClient(jsc, cfg, new HoodieMetrics(cfg, cfg.getTableName()));
+ }
+
+ /**
+ * Ensure records have location field set.
+ *
+ * @param taggedRecords Tagged Records
+ * @param commitTime Commit Timestamp
+ */
+ protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
+ for (HoodieRecord rec : taggedRecords) {
+ assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
+ assertEquals("All records should have commit time " + commitTime + ", since updates were made",
+ rec.getCurrentLocation().getInstantTime(), commitTime);
}
-
- /**
- * Get Default HoodieWriteConfig for tests.
- *
- * @return Default Hoodie Write Config for tests
- */
- protected HoodieWriteConfig getConfig() {
- return getConfigBuilder().build();
+ }
+
+ /**
+ * Assert that there is no duplicate key at the partition level.
+ *
+ * @param records List of Hoodie records
+ */
+ void assertNodupesWithinPartition(List<HoodieRecord> records) {
+ Map<String, Set<String>> partitionToKeys = new HashMap<>();
+ for (HoodieRecord r : records) {
+ String key = r.getRecordKey();
+ String partitionPath = r.getPartitionPath();
+ if (!partitionToKeys.containsKey(partitionPath)) {
+ partitionToKeys.put(partitionPath, new HashSet<>());
+ }
+ assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
+ partitionToKeys.get(partitionPath).add(key);
}
-
- protected HoodieWriteConfig getConfig(IndexType indexType) {
- return getConfigBuilder(indexType).build();
+ }
+
+ /**
+ * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
+ * record-location setting. Uniqueness is guaranteed by record-generation function itself.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param recordGenFunction Records Generation function
+ * @return Wrapped function
+ */
+ private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+ final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+ return (commit, numRecords) -> {
+ final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+ List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+ final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+ JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
+ return taggedRecords.collect();
+ };
+ }
+
+ /**
+ * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
+ * record-location setting. Uniqueness is guaranteed by key-generation function itself.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param keyGenFunction Keys Generation function
+ * @return Wrapped function
+ */
+ private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+ final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+ return (numRecords) -> {
+ final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+ List<HoodieKey> records = keyGenFunction.apply(numRecords);
+ final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+ HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+ JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+ .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+ JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+ return taggedRecords.map(record -> record.getKey()).collect();
+ };
+ }
+
+ /**
+ * Generate wrapper for record generation function for testing Prepped APIs.
+ *
+ * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+ * @param writeConfig Hoodie Write Config
+ * @param wrapped Actual Records Generation function
+ * @return Wrapped Function
+ */
+ protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
+ HoodieWriteConfig writeConfig,
+ Function2<List<HoodieRecord>, String, Integer> wrapped) {
+ if (isPreppedAPI) {
+ return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+ } else {
+ return wrapped;
}
-
- /**
- * Get Config builder with default configs set.
- *
- * @return Config Builder
- */
- protected HoodieWriteConfig.Builder getConfigBuilder() {
- return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ }
+
+ /**
+ * Generate wrapper for delete key generation function for testing Prepped APIs.
+ *
+ * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+ * @param writeConfig Hoodie Write Config
+ * @param wrapped Actual Records Generation function
+ * @return Wrapped Function
+ */
+ Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+ HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
+ if (isPreppedAPI) {
+ return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+ } else {
+ return wrapped;
}
-
- /**
- * Get Config builder with default configs set.
- *
- * @return Config Builder
- */
- HoodieWriteConfig.Builder getConfigBuilder(IndexType indexType) {
- return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, indexType);
+ }
+
+ /**
+ * Helper to insert first batch of records and do regular assertions on the state after successful completion.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param numRecordsInThisCommit Number of records to be added in the new commit
+ * @param writeFn Write Function to be used for insertion
+ * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @return RDD of write-status
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+ String initCommitTime, int numRecordsInThisCommit,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+ boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
+ final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+ generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
+
+ return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
+ recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
+ }
+
+ /**
+ * Helper to upsert batch of records and do regular assertions on the state after successful completion.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param prevCommitTime Commit Timestamp used in previous commit
+ * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param numRecordsInThisCommit Number of records to be added in the new commit
+ * @param writeFn Write Function to be used for upsert
+ * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @param expTotalRecords Expected number of records when scanned
+ * @param expTotalCommits Expected number of commits (including this commit)
+ * @return RDD of write-status
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+ String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
+ int numRecordsInThisCommit,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
+ boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+ final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
+ generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
+
+ return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
+ numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
+ expTotalCommits);
+ }
+
+ /**
+ * Helper to delete batch of keys and do regular assertions on the state after successful completion.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param prevCommitTime Commit Timestamp used in previous commit
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param numRecordsInThisCommit Number of records to be added in the new commit
+ * @param deleteFn Delete Function to be used for deletes
+ * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @param expTotalRecords Expected number of records when scanned
+ * @return RDD of write-status
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+ String prevCommitTime, String initCommitTime,
+ int numRecordsInThisCommit,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+ boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+ final Function<Integer, List<HoodieKey>> keyGenFunction =
+ generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+ return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+ keyGenFunction,
+ deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+ }
+
+ /**
+ * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
+ *
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param prevCommitTime Commit Timestamp used in previous commit
+ * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param numRecordsInThisCommit Number of records to be added in the new commit
+ * @param recordGenFunction Records Generation Function
+ * @param writeFn Write Function to be used for upsert
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @param expTotalRecords Expected number of records when scanned
+ * @param expTotalCommits Expected number of commits (including this commit)
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+ Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
+ Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+ boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
+
+ // Write 1 (only inserts)
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+
+ JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
+
+ // check the partition metadata is written out
+ assertPartitionMetadataForRecords(records, fs);
+
+ // verify that there is a commit
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+ if (assertForCommit) {
+ assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
+ timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+ Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+ timeline.lastInstant().get().getTimestamp());
+ assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+ HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+ HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+ // Check that the incremental consumption from prevCommitTime
+ assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
+ HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+ HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+ if (commitTimesBetweenPrevAndNew.isPresent()) {
+ commitTimesBetweenPrevAndNew.get().forEach(ct -> {
+ assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
+ HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+ HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
+ });
+ }
}
-
- HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
- return getConfigBuilder(schemaStr, IndexType.BLOOM);
+ return result;
+ }
+
+ /**
+ * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
+ *
+ * @param client Hoodie Write Client
+ * @param newCommitTime New Commit Timestamp to be used
+ * @param prevCommitTime Commit Timestamp used in previous commit
+ * @param initCommitTime Begin Timestamp (usually "000")
+ * @param keyGenFunction Key Generation function
+ * @param deleteFn Write Function to be used for delete
+ * @param assertForCommit Enable Assertion of Writes
+ * @param expRecordsInThisCommit Expected number of records in this commit
+ * @param expTotalRecords Expected number of records when scanned
+ * @throws Exception in case of error
+ */
+ JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+ String initCommitTime, int numRecordsInThisCommit,
+ Function<Integer, List<HoodieKey>> keyGenFunction,
+ Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+ boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+ // Delete 1 (only deletes)
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
+ JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+ JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
+
+ // check the partition metadata is written out
+ assertPartitionMetadataForKeys(keysToDelete, fs);
+
+ // verify that there is a commit
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+ HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+ if (assertForCommit) {
+ assertEquals("Expecting 3 commits.", 3,
+ timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+ Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+ timeline.lastInstant().get().getTimestamp());
+ assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+ HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+ // Check the entire dataset has all records still
+ String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+ for (int i = 0; i < fullPartitionPaths.length; i++) {
+ fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+ }
+ assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+ HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+ // Check that the incremental consumption from prevCommitTime
+ assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+ + " since it is a delete operation",
+ HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+ HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
}
+ return result;
+ }
- /**
- * Get Config builder with default configs set.
- *
- * @return Config Builder
- */
- HoodieWriteConfig.Builder getConfigBuilder(String schemaStr, IndexType indexType) {
- return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
- .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
- .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
- .withWriteStatusClass(MetadataMergeWriteStatus.class)
- .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
- .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 * 1024).build())
- .forTable("test-trip-table")
- .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(indexType).build())
- .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
- .withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
- }
+ /**
+ * Get Cleaner state corresponding to a partition path.
+ *
+ * @param hoodieCleanStatsTwo List of Clean Stats
+ * @param partitionPath Partition path for filtering
+ * @return Cleaner state corresponding to partition path
+ */
+ protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
+ return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
+ }
- protected HoodieTable getHoodieTable(HoodieTableMetaClient metaClient, HoodieWriteConfig config) {
- HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
- ((SyncableFileSystemView) (table.getSliceView())).reset();
- return table;
- }
+ // Functional Interfaces for passing lambda and Hoodie Write API contexts
- /**
- * Assert no failures in writing hoodie files.
- *
- * @param statuses List of Write Status
- */
- public static void assertNoWriteErrors(List<WriteStatus> statuses) {
- // Verify there are no errors
- for (WriteStatus status : statuses) {
- assertFalse("Errors found in write of " + status.getFileId(), status.hasErrors());
- }
- }
+ @FunctionalInterface
+ public interface Function2<R, T1, T2> {
- void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
- Set<String> partitionPathSet = inputRecords.stream()
- .map(HoodieRecord::getPartitionPath)
- .collect(Collectors.toSet());
- assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
- }
+ R apply(T1 v1, T2 v2) throws IOException;
+ }
- void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
- Set<String> partitionPathSet = inputKeys.stream()
- .map(HoodieKey::getPartitionPath)
- .collect(Collectors.toSet());
- assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
- }
-
- /**
- * Ensure presence of partition meta-data at known depth.
- *
- * @param partitionPaths Partition paths to check
- * @param fs File System
- * @throws IOException in case of error
- */
- void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
- for (String partitionPath : partitionPaths) {
- assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
- HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
- pmeta.readFromFS();
- Assert.assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_DEPTH, pmeta.getPartitionDepth());
- }
- }
-
- /**
- * Ensure records have location field set.
- *
- * @param taggedRecords Tagged Records
- * @param commitTime Commit Timestamp
- */
- protected void checkTaggedRecords(List<HoodieRecord> taggedRecords, String commitTime) {
- for (HoodieRecord rec : taggedRecords) {
- assertTrue("Record " + rec + " found with no location.", rec.isCurrentLocationKnown());
- assertEquals("All records should have commit time " + commitTime + ", since updates were made",
- rec.getCurrentLocation().getInstantTime(), commitTime);
- }
- }
-
- /**
- * Assert that there is no duplicate key at the partition level.
- *
- * @param records List of Hoodie records
- */
- void assertNodupesWithinPartition(List<HoodieRecord> records) {
- Map<String, Set<String>> partitionToKeys = new HashMap<>();
- for (HoodieRecord r : records) {
- String key = r.getRecordKey();
- String partitionPath = r.getPartitionPath();
- if (!partitionToKeys.containsKey(partitionPath)) {
- partitionToKeys.put(partitionPath, new HashSet<>());
- }
- assertFalse("key " + key + " is duplicate within partition " + partitionPath, partitionToKeys.get(partitionPath).contains(key));
- partitionToKeys.get(partitionPath).add(key);
- }
- }
-
- /**
- * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records to be already de-duped and have location set. This wrapper takes care of
- * record-location setting. Uniqueness is guaranteed by record-generation function itself.
- *
- * @param writeConfig Hoodie Write Config
- * @param recordGenFunction Records Generation function
- * @return Wrapped function
- */
- private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
- final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
- return (commit, numRecords) -> {
- final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
- List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
- final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
- HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
- JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(jsc.parallelize(records, 1), jsc, table);
- return taggedRecords.collect();
- };
- }
-
- /**
- * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys to be already de-duped and have location set. This wrapper takes care of
- * record-location setting. Uniqueness is guaranteed by key-generation function itself.
- *
- * @param writeConfig Hoodie Write Config
- * @param keyGenFunction Keys Generation function
- * @return Wrapped function
- */
- private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
- final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
- return (numRecords) -> {
- final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
- List<HoodieKey> records = keyGenFunction.apply(numRecords);
- final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
- HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
- JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
- .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
- JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
- return taggedRecords.map(record -> record.getKey()).collect();
- };
- }
+ @FunctionalInterface
+ public interface Function3<R, T1, T2, T3> {
- /**
- * Generate wrapper for record generation function for testing Prepped APIs.
- *
- * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
- * @param writeConfig Hoodie Write Config
- * @param wrapped Actual Records Generation function
- * @return Wrapped Function
- */
- protected Function2<List<HoodieRecord>, String, Integer> generateWrapRecordsFn(boolean isPreppedAPI,
- HoodieWriteConfig writeConfig,
- Function2<List<HoodieRecord>, String, Integer> wrapped) {
- if (isPreppedAPI) {
- return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
- } else {
- return wrapped;
- }
- }
-
- /**
- * Generate wrapper for delete key generation function for testing Prepped APIs.
- *
- * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
- * @param writeConfig Hoodie Write Config
- * @param wrapped Actual Records Generation function
- * @return Wrapped Function
- */
- Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
- HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
- if (isPreppedAPI) {
- return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
- } else {
- return wrapped;
- }
- }
-
- /**
- * Helper to insert first batch of records and do regular assertions on the state after successful completion.
- *
- * @param writeConfig Hoodie Write Config
- * @param client Hoodie Write Client
- * @param newCommitTime New Commit Timestamp to be used
- * @param initCommitTime Begin Timestamp (usually "000")
- * @param numRecordsInThisCommit Number of records to be added in the new commit
- * @param writeFn Write Function to be used for insertion
- * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
- * @param assertForCommit Enable Assertion of Writes
- * @param expRecordsInThisCommit Expected number of records in this commit
- * @return RDD of write-status
- * @throws Exception in case of error
- */
- JavaRDD<WriteStatus> insertFirstBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
- String initCommitTime, int numRecordsInThisCommit,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
- boolean assertForCommit, int expRecordsInThisCommit) throws Exception {
- final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
- generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateInserts);
-
- return writeBatch(client, newCommitTime, initCommitTime, Option.empty(), initCommitTime, numRecordsInThisCommit,
- recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expRecordsInThisCommit, 1);
- }
-
- /**
- * Helper to upsert batch of records and do regular assertions on the state after successful completion.
- *
- * @param writeConfig Hoodie Write Config
- * @param client Hoodie Write Client
- * @param newCommitTime New Commit Timestamp to be used
- * @param prevCommitTime Commit Timestamp used in previous commit
- * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
- * @param initCommitTime Begin Timestamp (usually "000")
- * @param numRecordsInThisCommit Number of records to be added in the new commit
- * @param writeFn Write Function to be used for upsert
- * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
- * @param assertForCommit Enable Assertion of Writes
- * @param expRecordsInThisCommit Expected number of records in this commit
- * @param expTotalRecords Expected number of records when scanned
- * @param expTotalCommits Expected number of commits (including this commit)
- * @return RDD of write-status
- * @throws Exception in case of error
- */
- JavaRDD<WriteStatus> updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
- String prevCommitTime, Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime,
- int numRecordsInThisCommit,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn, boolean isPreppedAPI,
- boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
- final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
- generateWrapRecordsFn(isPreppedAPI, writeConfig, dataGen::generateUniqueUpdates);
-
- return writeBatch(client, newCommitTime, prevCommitTime, commitTimesBetweenPrevAndNew, initCommitTime,
- numRecordsInThisCommit, recordGenFunction, writeFn, assertForCommit, expRecordsInThisCommit, expTotalRecords,
- expTotalCommits);
- }
-
- /**
- * Helper to delete batch of keys and do regular assertions on the state after successful completion.
- *
- * @param writeConfig Hoodie Write Config
- * @param client Hoodie Write Client
- * @param newCommitTime New Commit Timestamp to be used
- * @param prevCommitTime Commit Timestamp used in previous commit
- * @param initCommitTime Begin Timestamp (usually "000")
- * @param numRecordsInThisCommit Number of records to be added in the new commit
- * @param deleteFn Delete Function to be used for deletes
- * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
- * @param assertForCommit Enable Assertion of Writes
- * @param expRecordsInThisCommit Expected number of records in this commit
- * @param expTotalRecords Expected number of records when scanned
- * @return RDD of write-status
- * @throws Exception in case of error
- */
- JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
- String prevCommitTime, String initCommitTime,
- int numRecordsInThisCommit,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
- boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
- final Function<Integer, List<HoodieKey>> keyGenFunction =
- generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
-
- return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
- keyGenFunction,
- deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
- }
-
- /**
- * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion.
- *
- * @param client Hoodie Write Client
- * @param newCommitTime New Commit Timestamp to be used
- * @param prevCommitTime Commit Timestamp used in previous commit
- * @param commitTimesBetweenPrevAndNew Sample of Timestamps between prevCommitTime and newCommitTime
- * @param initCommitTime Begin Timestamp (usually "000")
- * @param numRecordsInThisCommit Number of records to be added in the new commit
- * @param recordGenFunction Records Generation Function
- * @param writeFn Write Function to be used for upsert
- * @param assertForCommit Enable Assertion of Writes
- * @param expRecordsInThisCommit Expected number of records in this commit
- * @param expTotalRecords Expected number of records when scanned
- * @param expTotalCommits Expected number of commits (including this commit)
- * @throws Exception in case of error
- */
- JavaRDD<WriteStatus> writeBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
- Option<List<String>> commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
- Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
- boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords, int expTotalCommits) throws Exception {
-
- // Write 1 (only inserts)
- client.startCommitWithTime(newCommitTime);
-
- List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecordsInThisCommit);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
-
- JavaRDD<WriteStatus> result = writeFn.apply(client, writeRecords, newCommitTime);
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
- // check the partition metadata is written out
- assertPartitionMetadataForRecords(records, fs);
-
- // verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
- if (assertForCommit) {
- assertEquals("Expecting " + expTotalCommits + " commits.", expTotalCommits,
- timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
- Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
- timeline.lastInstant().get().getTimestamp());
- assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
- HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
- // Check the entire dataset has all records still
- String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
- }
- assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
- HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
- // Check that the incremental consumption from prevCommitTime
- assertEquals("Incremental consumption from " + prevCommitTime + " should give all records in latest commit",
- HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
- HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
- if (commitTimesBetweenPrevAndNew.isPresent()) {
- commitTimesBetweenPrevAndNew.get().forEach(ct -> {
- assertEquals("Incremental consumption from " + ct + " should give all records in latest commit",
- HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
- HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, ct).count());
- });
- }
- }
- return result;
- }
-
- /**
- * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion.
- *
- * @param client Hoodie Write Client
- * @param newCommitTime New Commit Timestamp to be used
- * @param prevCommitTime Commit Timestamp used in previous commit
- * @param initCommitTime Begin Timestamp (usually "000")
- * @param keyGenFunction Key Generation function
- * @param deleteFn Write Function to be used for delete
- * @param assertForCommit Enable Assertion of Writes
- * @param expRecordsInThisCommit Expected number of records in this commit
- * @param expTotalRecords Expected number of records when scanned
- * @throws Exception in case of error
- */
- JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
- String initCommitTime, int numRecordsInThisCommit,
- Function<Integer, List<HoodieKey>> keyGenFunction,
- Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
- boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
-
- // Delete 1 (only deletes)
- client.startCommitWithTime(newCommitTime);
-
- List<HoodieKey> keysToDelete = keyGenFunction.apply(numRecordsInThisCommit);
- JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
-
- JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
-
- // check the partition metadata is written out
- assertPartitionMetadataForKeys(keysToDelete, fs);
-
- // verify that there is a commit
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
- HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
-
- if (assertForCommit) {
- assertEquals("Expecting 3 commits.", 3,
- timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
- Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
- timeline.lastInstant().get().getTimestamp());
- assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
- HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
-
- // Check the entire dataset has all records still
- String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
- for (int i = 0; i < fullPartitionPaths.length; i++) {
- fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
- }
- assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
- HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
-
- // Check that the incremental consumption from prevCommitTime
- assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
- + " since it is a delete operation",
- HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
- HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
- }
- return result;
- }
-
- /**
- * Get Cleaner state corresponding to a partition path.
- *
- * @param hoodieCleanStatsTwo List of Clean Stats
- * @param partitionPath Partition path for filtering
- * @return Cleaner state corresponding to partition path
- */
- protected HoodieCleanStat getCleanStat(List<HoodieCleanStat> hoodieCleanStatsTwo, String partitionPath) {
- return hoodieCleanStatsTwo.stream().filter(e -> e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
- }
-
- // Functional Interfaces for passing lambda and Hoodie Write API contexts
-
- @FunctionalInterface
- public interface Function2<R, T1, T2> {
-
- R apply(T1 v1, T2 v2) throws IOException;
- }
-
- @FunctionalInterface
- public interface Function3<R, T1, T2, T3> {
-
- R apply(T1 v1, T2 v2, T3 v3) throws IOException;
- }
+ R apply(T1 v1, T2 v2, T3 v3) throws IOException;
+ }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
index e4202f0..4c7b890 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestHarness.java
@@ -49,239 +49,239 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness implements Serializable {
- private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
-
- protected transient JavaSparkContext jsc = null;
- protected transient SQLContext sqlContext;
- protected transient FileSystem fs;
- protected transient HoodieTestDataGenerator dataGen = null;
- protected transient ExecutorService executorService;
- protected transient HoodieTableMetaClient metaClient;
- private static AtomicInteger instantGen = new AtomicInteger(1);
- protected transient HoodieWriteClient client;
-
- public String getNextInstant() {
- return String.format("%09d", instantGen.getAndIncrement());
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieClientTestHarness.class);
+
+ protected transient JavaSparkContext jsc = null;
+ protected transient SQLContext sqlContext;
+ protected transient FileSystem fs;
+ protected transient HoodieTestDataGenerator dataGen = null;
+ protected transient ExecutorService executorService;
+ protected transient HoodieTableMetaClient metaClient;
+ private static AtomicInteger instantGen = new AtomicInteger(1);
+ protected transient HoodieWriteClient client;
+
+ public String getNextInstant() {
+ return String.format("%09d", instantGen.getAndIncrement());
+ }
+
+ // dfs
+ protected String dfsBasePath;
+ protected transient HdfsTestService hdfsTestService;
+ protected transient MiniDFSCluster dfsCluster;
+ protected transient DistributedFileSystem dfs;
+
+ /**
+ * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
+ */
+ public void initResources() throws IOException {
+ initPath();
+ initSparkContexts();
+ initTestDataGenerator();
+ initFileSystem();
+ initMetaClient();
+ }
+
+ /**
+ * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
+ */
+ public void cleanupResources() throws IOException {
+ cleanupClients();
+ cleanupSparkContexts();
+ cleanupTestDataGenerator();
+ cleanupFileSystem();
+ }
+
+ /**
+ * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
+ *
+ * @param appName The specified application name.
+ */
+ protected void initSparkContexts(String appName) {
+ // Initialize a local spark env
+ jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
+ jsc.setLogLevel("ERROR");
+
+ // SQLContext stuff
+ sqlContext = new SQLContext(jsc);
+ }
+
+ /**
+ * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
+ * <b>TestHoodieClient</b>.
+ */
+ protected void initSparkContexts() {
+ initSparkContexts("TestHoodieClient");
+ }
+
+ /**
+ * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
+ */
+ protected void cleanupSparkContexts() {
+ if (sqlContext != null) {
+ LOG.info("Clearing sql context cache of spark-session used in previous test-case");
+ sqlContext.clearCache();
+ sqlContext = null;
}
- // dfs
- protected String dfsBasePath;
- protected transient HdfsTestService hdfsTestService;
- protected transient MiniDFSCluster dfsCluster;
- protected transient DistributedFileSystem dfs;
-
- /**
- * Initializes resource group for the subclasses of {@link TestHoodieClientBase}.
- */
- public void initResources() throws IOException {
- initPath();
- initSparkContexts();
- initTestDataGenerator();
- initFileSystem();
- initMetaClient();
+ if (jsc != null) {
+ LOG.info("Closing spark context used in previous test-case");
+ jsc.close();
+ jsc.stop();
+ jsc = null;
}
-
- /**
- * Cleanups resource group for the subclasses of {@link TestHoodieClientBase}.
- */
- public void cleanupResources() throws IOException {
- cleanupClients();
- cleanupSparkContexts();
- cleanupTestDataGenerator();
- cleanupFileSystem();
- }
-
- /**
- * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with the given application name.
- *
- * @param appName The specified application name.
- */
- protected void initSparkContexts(String appName) {
- // Initialize a local spark env
- jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(appName));
- jsc.setLogLevel("ERROR");
-
- // SQLContext stuff
- sqlContext = new SQLContext(jsc);
- }
-
- /**
- * Initializes the Spark contexts ({@link JavaSparkContext} and {@link SQLContext}) with a default name
- * <b>TestHoodieClient</b>.
- */
- protected void initSparkContexts() {
- initSparkContexts("TestHoodieClient");
- }
-
- /**
- * Cleanups Spark contexts ({@link JavaSparkContext} and {@link SQLContext}).
- */
- protected void cleanupSparkContexts() {
- if (sqlContext != null) {
- LOG.info("Clearing sql context cache of spark-session used in previous test-case");
- sqlContext.clearCache();
- sqlContext = null;
- }
-
- if (jsc != null) {
- LOG.info("Closing spark context used in previous test-case");
- jsc.close();
- jsc.stop();
- jsc = null;
- }
- }
-
- /**
- * Initializes a file system with the hadoop configuration of Spark context.
- */
- protected void initFileSystem() {
- if (jsc == null) {
- throw new IllegalStateException("The Spark context has not been initialized.");
- }
-
- initFileSystemWithConfiguration(jsc.hadoopConfiguration());
- }
-
- /**
- * Initializes file system with a default empty configuration.
- */
- protected void initFileSystemWithDefaultConfiguration() {
- initFileSystemWithConfiguration(new Configuration());
+ }
+
+ /**
+ * Initializes a file system with the hadoop configuration of Spark context.
+ */
+ protected void initFileSystem() {
+ if (jsc == null) {
+ throw new IllegalStateException("The Spark context has not been initialized.");
}
- /**
- * Cleanups file system.
- */
- protected void cleanupFileSystem() throws IOException {
- if (fs != null) {
- LOG.warn("Closing file-system instance used in previous test-run");
- fs.close();
- }
+ initFileSystemWithConfiguration(jsc.hadoopConfiguration());
+ }
+
+ /**
+ * Initializes file system with a default empty configuration.
+ */
+ protected void initFileSystemWithDefaultConfiguration() {
+ initFileSystemWithConfiguration(new Configuration());
+ }
+
+ /**
+ * Cleanups file system.
+ */
+ protected void cleanupFileSystem() throws IOException {
+ if (fs != null) {
+ LOG.warn("Closing file-system instance used in previous test-run");
+ fs.close();
}
-
- /**
- * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
- */
- protected void initMetaClient() throws IOException {
- if (basePath == null) {
- throw new IllegalStateException("The base path has not been initialized.");
- }
-
- if (jsc == null) {
- throw new IllegalStateException("The Spark context has not been initialized.");
- }
-
- metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
- }
-
- /**
- * Cleanups table type.
- */
- protected void cleanupClients() {
- metaClient = null;
- if (null != client) {
- client.close();
- client = null;
- }
- }
-
- /**
- * Initializes a test data generator which used to generate test datas.
- */
- protected void initTestDataGenerator() {
- dataGen = new HoodieTestDataGenerator();
+ }
+
+ /**
+ * Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by {@code getTableType()}.
+ */
+ protected void initMetaClient() throws IOException {
+ if (basePath == null) {
+ throw new IllegalStateException("The base path has not been initialized.");
}
- /**
- * Cleanups test data generator.
- */
- protected void cleanupTestDataGenerator() {
- dataGen = null;
+ if (jsc == null) {
+ throw new IllegalStateException("The Spark context has not been initialized.");
}
- /**
- * Initializes a distributed file system and base directory.
- */
- protected void initDFS() throws IOException {
- FileSystem.closeAll();
- hdfsTestService = new HdfsTestService();
- dfsCluster = hdfsTestService.start(true);
-
- // Create a temp folder as the base path
- dfs = dfsCluster.getFileSystem();
- dfsBasePath = dfs.getWorkingDirectory().toString();
- dfs.mkdirs(new Path(dfsBasePath));
+ metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath, getTableType());
+ }
+
+ /**
+ * Cleanups table type.
+ */
+ protected void cleanupClients() {
+ metaClient = null;
+ if (null != client) {
+ client.close();
+ client = null;
}
-
- /**
- * Cleanups the distributed file system.
- */
- protected void cleanupDFS() throws IOException {
- if (hdfsTestService != null) {
- hdfsTestService.stop();
- dfsCluster.shutdown();
- hdfsTestService = null;
- dfsCluster = null;
- dfs = null;
- }
- // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
- // same JVM
- FileSystem.closeAll();
+ }
+
+ /**
+ * Initializes a test data generator which used to generate test datas.
+ */
+ protected void initTestDataGenerator() {
+ dataGen = new HoodieTestDataGenerator();
+ }
+
+ /**
+ * Cleanups test data generator.
+ */
+ protected void cleanupTestDataGenerator() {
+ dataGen = null;
+ }
+
+ /**
+ * Initializes a distributed file system and base directory.
+ */
+ protected void initDFS() throws IOException {
+ FileSystem.closeAll();
+ hdfsTestService = new HdfsTestService();
+ dfsCluster = hdfsTestService.start(true);
+
+ // Create a temp folder as the base path
+ dfs = dfsCluster.getFileSystem();
+ dfsBasePath = dfs.getWorkingDirectory().toString();
+ dfs.mkdirs(new Path(dfsBasePath));
+ }
+
+ /**
+ * Cleanups the distributed file system.
+ */
+ protected void cleanupDFS() throws IOException {
+ if (hdfsTestService != null) {
+ hdfsTestService.stop();
+ dfsCluster.shutdown();
+ hdfsTestService = null;
+ dfsCluster = null;
+ dfs = null;
}
-
- /**
- * Initializes executor service with a fixed thread pool.
- *
- * @param threadNum specify the capacity of the fixed thread pool
- */
- protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
- executorService = Executors.newFixedThreadPool(threadNum);
+ // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+ // same JVM
+ FileSystem.closeAll();
+ }
+
+ /**
+ * Initializes executor service with a fixed thread pool.
+ *
+ * @param threadNum specify the capacity of the fixed thread pool
+ */
+ protected void initExecutorServiceWithFixedThreadPool(int threadNum) {
+ executorService = Executors.newFixedThreadPool(threadNum);
+ }
+
+ /**
+ * Cleanups the executor service.
+ */
+ protected void cleanupExecutorService() {
+ if (this.executorService != null) {
+ this.executorService.shutdownNow();
+ this.executorService = null;
}
+ }
- /**
- * Cleanups the executor service.
- */
- protected void cleanupExecutorService() {
- if (this.executorService != null) {
- this.executorService.shutdownNow();
- this.executorService = null;
- }
+ private void initFileSystemWithConfiguration(Configuration configuration) {
+ if (basePath == null) {
+ throw new IllegalStateException("The base path has not been initialized.");
}
- private void initFileSystemWithConfiguration(Configuration configuration) {
- if (basePath == null) {
- throw new IllegalStateException("The base path has not been initialized.");
- }
-
- fs = FSUtils.getFs(basePath, configuration);
- if (fs instanceof LocalFileSystem) {
- LocalFileSystem lfs = (LocalFileSystem) fs;
- // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
- // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
- // So, for the tests, we enforce checksum verification to circumvent the problem
- lfs.setVerifyChecksum(true);
- }
+ fs = FSUtils.getFs(basePath, configuration);
+ if (fs instanceof LocalFileSystem) {
+ LocalFileSystem lfs = (LocalFileSystem) fs;
+ // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+ // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+ // So, for the tests, we enforce checksum verification to circumvent the problem
+ lfs.setVerifyChecksum(true);
}
+ }
- public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
- return getHoodieWriteClient(cfg, false);
- }
+ public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg) {
+ return getHoodieWriteClient(cfg, false);
+ }
- public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
- return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
- }
+ public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit) {
+ return getHoodieWriteClient(cfg, rollbackInflightCommit, HoodieIndex.createIndex(cfg, null));
+ }
- public HoodieReadClient getHoodieReadClient(String basePath) {
- return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
- }
+ public HoodieReadClient getHoodieReadClient(String basePath) {
+ return new HoodieReadClient(jsc, basePath, SQLContext.getOrCreate(jsc.sc()));
+ }
- public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
- HoodieIndex index) {
- if (null != client) {
- client.close();
- client = null;
- }
- client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
- return client;
+ public HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig cfg, boolean rollbackInflightCommit,
+ HoodieIndex index) {
+ if (null != client) {
+ client.close();
+ client = null;
}
+ client = new HoodieWriteClient(jsc, cfg, rollbackInflightCommit, index);
+ return client;
+ }
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
index 9f3eaea..fdc8b27 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java
@@ -655,6 +655,8 @@ public class TestMergeOnReadTable extends HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withEmbeddedTimelineServerEnabled(true)
+ .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+ .withEnableBackupForRemoteFileSystemView(false).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024).build()).forTable("test-trip-table")
.build();
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
index 09d62a7..86a2e1f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/compact/TestHoodieCompactor.java
@@ -18,9 +18,9 @@
package org.apache.hudi.table.compact;
-import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestHarness;
import org.apache.hudi.common.HoodieTestDataGenerator;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieTestUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -129,7 +129,7 @@ public class TestHoodieCompactor extends HoodieClientTestHarness {
// insert 100 records
HoodieWriteConfig config = getConfig();
try (HoodieWriteClient writeClient = getHoodieWriteClient(config);) {
- String newCommitTime = "100";
+ String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
index d77392f..b2a7f83 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/string/TestHoodieActiveTimeline.java
@@ -385,7 +385,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
// filterCompletedAndCompactionInstants
// This cannot be done using checkFilter as it involves both states and actions
final HoodieTimeline t1 = timeline.filterCompletedAndCompactionInstants();
- final Set<State> states = Sets.newHashSet(State.REQUESTED, State.COMPLETED);
+ final Set<State> states = Sets.newHashSet(State.COMPLETED);
final Set<String> actions = Collections.singleton(HoodieTimeline.COMPACTION_ACTION);
sup.get().filter(i -> states.contains(i.getState()) || actions.contains(i.getAction()))
.forEach(i -> assertTrue(t1.containsInstant(i)));