You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/10/22 23:47:01 UTC
[hudi] branch master updated: [HUDI-5070] Move flaky cleaner tests to separate class (#7034)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b78c3441c4 [HUDI-5070] Move flaky cleaner tests to separate class (#7034)
b78c3441c4 is described below
commit b78c3441c4e28200abec340eaff852375764cbdb
Author: Shiyan Xu <27...@users.noreply.github.com>
AuthorDate: Sun Oct 23 07:46:53 2022 +0800
[HUDI-5070] Move flaky cleaner tests to separate class (#7034)
---
.../TestHoodieClientOnCopyOnWriteStorage.java | 2 +-
.../java/org/apache/hudi/table/TestCleaner.java | 186 +++-----------------
.../clean/TestCleanerInsertAndCleanByCommits.java | 194 +++++++++++++++++++++
.../hudi/testutils/HoodieClientTestBase.java | 85 ++++++---
.../hudi/testutils/HoodieClientTestHarness.java | 27 ++-
5 files changed, 300 insertions(+), 194 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 9787387a1e..5f4cf358d5 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1327,7 +1327,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
- assertPartitionMetadata(new String[] {testPartitionPath}, fs);
+ assertPartitionMetadata(basePath, new String[] {testPartitionPath}, fs);
assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
String file1 = statuses.get(0).getFileId();
assertEquals(100,
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 4f69518ceb..c1dae9afa4 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -31,7 +31,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSliceInfo;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.bootstrap.TestBootstrapIndex;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -73,7 +73,6 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
@@ -86,8 +85,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -101,10 +98,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -132,21 +127,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestCleaner extends HoodieClientTestBase {
private static final int BIG_BATCH_INSERT_SIZE = 500;
- private static final Logger LOG = LogManager.getLogger(TestCleaner.class);
+ private static final int PARALLELISM = 10;
/**
* Helper method to do first batch of insert for clean by versions/commits tests.
*
- * @param cfg Hoodie Write Config
+ * @param context Spark engine context
+ * @param metaClient Hoodie table meta client
* @param client Hoodie Client
* @param recordGenFunction Function to generate records for insertion
* @param insertFn Insertion API for testing
* @throws Exception in case of error
*/
- private Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
+ public static Pair<String, JavaRDD<WriteStatus>> insertFirstBigBatchForClientCleanerTest(
+ HoodieSparkEngineContext context,
+ HoodieTableMetaClient metaClient,
+ SparkRDDWriteClient client,
Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
- HoodieCleaningPolicy cleaningPolicy) throws Exception {
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
/*
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -155,7 +153,7 @@ public class TestCleaner extends HoodieClientTestBase {
String newCommitTime = client.startCommit();
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
+ JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, PARALLELISM);
JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
// Verify there are no errors
@@ -174,8 +172,8 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(table.getCompletedCleanTimeline().empty());
if (client.getConfig().shouldAutoCommit()) {
- HoodieIndex index = SparkHoodieIndexFactory.createIndex(cfg);
- List<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table).collect();
+ HoodieIndex index = SparkHoodieIndexFactory.createIndex(client.getConfig());
+ List<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, PARALLELISM), table).collect();
checkTaggedRecords(taggedRecords, newCommitTime);
}
return Pair.of(newCommitTime, statuses);
@@ -184,16 +182,17 @@ public class TestCleaner extends HoodieClientTestBase {
/**
* Helper method to do first batch of insert for clean by versions/commits tests.
*
- * @param cfg Hoodie Write Config
+ * @param context Spark engine context
* @param client Hoodie Client
* @param recordGenFunction Function to generate records for insertion
* @param insertFn Insertion API for testing
* @throws Exception in case of error
*/
- private Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest(HoodieWriteConfig cfg, SparkRDDWriteClient client,
- Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
- HoodieCleaningPolicy cleaningPolicy) throws Exception {
+ public static Pair<String, JavaRDD<WriteStatus>> insertFirstFailedBigBatchForClientCleanerTest(
+ HoodieSparkEngineContext context,
+ SparkRDDWriteClient client,
+ Function2<List<HoodieRecord>, String, Integer> recordGenFunction,
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn) throws Exception {
/*
* do a big insert (this is basically same as insert part of upsert, just adding it here so we can catch breakages
@@ -202,7 +201,7 @@ public class TestCleaner extends HoodieClientTestBase {
String newCommitTime = client.startCommit();
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, BIG_BATCH_INSERT_SIZE);
- JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 5);
+ JavaRDD<HoodieRecord> writeRecords = context.getJavaSparkContext().parallelize(records, 5);
JavaRDD<WriteStatus> statuses = insertFn.apply(client, writeRecords, newCommitTime);
// Verify there are no errors
@@ -359,8 +358,7 @@ public class TestCleaner extends HoodieClientTestBase {
final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
- insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+ insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
Map<HoodieFileGroupId, FileSlice> compactionFileIdToLatestFileSlice = new HashMap<>();
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -458,15 +456,6 @@ public class TestCleaner extends HoodieClientTestBase {
}
}
- /**
- * Test Clean-By-Commits using insert/upsert API.
- */
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
- testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
- }
-
/**
* Test Clean-By-Commits using insert/upsert API.
*/
@@ -475,117 +464,6 @@ public class TestCleaner extends HoodieClientTestBase {
testFailedInsertAndCleanByCommits(SparkRDDWriteClient::insert, false);
}
- /**
- * Test Clean-By-Commits using prepped version of insert/upsert API.
- */
- @Test
- public void testInsertPreppedAndCleanByCommits() throws Exception {
- testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
- true, false);
- }
-
- /**
- * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
- testInsertAndCleanByCommits(
- (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
- SparkRDDWriteClient::upsertPreppedRecords, true, false);
- }
-
- /**
- * Test Clean-By-Commits using bulk-insert/upsert API.
- */
- @Test
- public void testBulkInsertAndCleanByCommits() throws Exception {
- testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
- }
-
- /**
- * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
- *
- * @param insertFn Insert API to be tested
- * @param upsertFn Upsert API to be tested
- * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
- * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
- * @throws Exception in case of errors
- */
- private void testInsertAndCleanByCommits(
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
- Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
- throws Exception {
- int maxCommits = 3; // keep upto 3 commits from the past
- HoodieWriteConfig cfg = getConfigBuilder()
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build())
- .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
- .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
- .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
- .build();
- SparkRDDWriteClient client = getHoodieWriteClient(cfg);
-
- final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
- generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
-
- final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction =
- generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateUniqueUpdates);
-
- insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
-
- // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
- for (int i = 0; i < 8; i++) {
- String newCommitTime = makeNewCommitTime();
- try {
- client.startCommitWithTime(newCommitTime);
- List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, 100);
-
- List<WriteStatus> statuses = upsertFn.apply(client, jsc.parallelize(records, 1), newCommitTime).collect();
- // Verify there are no errors
- assertNoWriteErrors(statuses);
-
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient);
- HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
- HoodieInstant lastInstant = activeTimeline.lastInstant().get();
- if (cfg.isAsyncClean()) {
- activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
- }
- // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
- // commit
- Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
- Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
- if (earliestRetainedCommit.isPresent()) {
- acceptableCommits
- .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
- .getInstants().collect(Collectors.toSet()));
- acceptableCommits.add(earliestRetainedCommit.get());
- }
-
- TableFileSystemView fsView = table1.getFileSystemView();
- // Need to ensure the following
- for (String partitionPath : dataGen.getPartitionPaths()) {
- List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
- for (HoodieFileGroup fileGroup : fileGroups) {
- Set<String> commitTimes = new HashSet<>();
- fileGroup.getAllBaseFiles().forEach(value -> {
- LOG.debug("Data File - " + value);
- commitTimes.add(value.getCommitTime());
- });
- if (cfg.isAsyncClean()) {
- commitTimes.remove(lastInstant.getTimestamp());
- }
- assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
- "Only contain acceptable versions of file should be present");
- }
- }
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- }
-
/**
* Test Helper for Cleaning failed commits by commits logic from HoodieWriteClient API perspective.
*
@@ -612,22 +490,18 @@ public class TestCleaner extends HoodieClientTestBase {
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
- Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+ Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
client.commit(result.getLeft(), result.getRight());
HoodieTable table = HoodieSparkTable.create(client.getConfig(), context, metaClient);
assertTrue(table.getCompletedCleanTimeline().empty());
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
Pair<String, JavaRDD<WriteStatus>> ret =
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
.isHeartbeatExpired(ret.getLeft()));
@@ -1352,8 +1226,7 @@ public class TestCleaner extends HoodieClientTestBase {
final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction =
generateWrapRecordsFn(isPreppedAPI, cfg, dataGen::generateInserts);
- Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+ Pair<String, JavaRDD<WriteStatus>> result = insertFirstBigBatchForClientCleanerTest(context, metaClient, client, recordInsertGenWrappedFunction, insertFn);
client.commit(result.getLeft(), result.getRight());
@@ -1361,15 +1234,12 @@ public class TestCleaner extends HoodieClientTestBase {
assertTrue(table.getCompletedCleanTimeline().empty());
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
Pair<String, JavaRDD<WriteStatus>> ret =
- insertFirstFailedBigBatchForClientCleanerTest(cfg, client, recordInsertGenWrappedFunction, insertFn,
- HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS);
+ insertFirstFailedBigBatchForClientCleanerTest(context, client, recordInsertGenWrappedFunction, insertFn);
// Await till enough time passes such that the last failed commits heartbeats are expired
await().atMost(10, TimeUnit.SECONDS).until(() -> client.getHeartbeatClient()
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
new file mode 100644
index 0000000000..7f5cd5cd99
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByCommits.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.clean;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.fs.ConsistencyGuardConfig;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime;
+import static org.apache.hudi.table.TestCleaner.insertFirstBigBatchForClientCleanerTest;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static org.apache.hudi.testutils.HoodieClientTestBase.Function2;
+import static org.apache.hudi.testutils.HoodieClientTestBase.Function3;
+import static org.apache.hudi.testutils.HoodieClientTestBase.wrapRecordsGenFunctionForPreppedCalls;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestCleanerInsertAndCleanByCommits extends SparkClientFunctionalTestHarness {
+
+ private static final Logger LOG = LogManager.getLogger(TestCleanerInsertAndCleanByCommits.class);
+ private static final int BATCH_SIZE = 100;
+ private static final int PARALLELISM = 2;
+
+ /**
+ * Test Clean-By-Commits using insert/upsert API.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
+ testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync);
+ }
+
+ /**
+ * Test Clean-By-Commits using prepped version of insert/upsert API.
+ */
+ @Test
+ public void testInsertPreppedAndCleanByCommits() throws Exception {
+ testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords,
+ true, false);
+ }
+
+ /**
+ * Test Clean-By-Commits using prepped versions of bulk-insert/upsert API.
+ */
+ @Test
+ public void testBulkInsertPreppedAndCleanByCommits() throws Exception {
+ testInsertAndCleanByCommits(
+ (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()),
+ SparkRDDWriteClient::upsertPreppedRecords, true, false);
+ }
+
+ /**
+ * Test Clean-By-Commits using bulk-insert/upsert API.
+ */
+ @Test
+ public void testBulkInsertAndCleanByCommits() throws Exception {
+ testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false);
+ }
+
+ /**
+ * Test Helper for Cleaning by versions logic from HoodieWriteClient API perspective.
+ *
+ * @param insertFn Insert API to be tested
+ * @param upsertFn Upsert API to be tested
+ * @param isPreppedAPI Flag to indicate if a prepped-version is used. If true, a wrapper function will be used during
+ * record generation to also tag the regards (de-dupe is implicit as we use unique record-gen APIs)
+ * @throws Exception in case of errors
+ */
+ private void testInsertAndCleanByCommits(
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> insertFn,
+ Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI, boolean isAsync)
+ throws Exception {
+ int maxCommits = 3; // keep upto 3 commits from the past
+ HoodieWriteConfig cfg = getConfigBuilder(true)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .withAsyncClean(isAsync).retainCommits(maxCommits).build())
+ .withParallelism(PARALLELISM, PARALLELISM)
+ .withBulkInsertParallelism(PARALLELISM)
+ .withFinalizeWriteParallelism(PARALLELISM)
+ .withDeleteParallelism(PARALLELISM)
+ .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+ .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build())
+ .build();
+ final SparkRDDWriteClient client = getHoodieWriteClient(cfg);
+
+ final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(System.nanoTime());
+ final Function2<List<HoodieRecord>, String, Integer> recordInsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateInserts)
+ : dataGen::generateInserts;
+ final Function2<List<HoodieRecord>, String, Integer> recordUpsertGenWrappedFunction = isPreppedAPI
+ ? wrapRecordsGenFunctionForPreppedCalls(basePath(), hadoopConf(), context(), cfg, dataGen::generateUniqueUpdates)
+ : dataGen::generateUniqueUpdates;
+
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.COPY_ON_WRITE);
+ insertFirstBigBatchForClientCleanerTest(context(), metaClient, client, recordInsertGenWrappedFunction, insertFn);
+
+ // Keep doing some writes and clean inline. Make sure we have expected number of files remaining.
+ for (int i = 0; i < 8; i++) {
+ String newCommitTime = makeNewCommitTime();
+ try {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = recordUpsertGenWrappedFunction.apply(newCommitTime, BATCH_SIZE);
+
+ List<WriteStatus> statuses = upsertFn.apply(client, jsc().parallelize(records, PARALLELISM), newCommitTime).collect();
+ // Verify there are no errors
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTable table1 = HoodieSparkTable.create(cfg, context(), metaClient);
+ HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline();
+ HoodieInstant lastInstant = activeTimeline.lastInstant().get();
+ if (cfg.isAsyncClean()) {
+ activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp());
+ }
+ // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest
+ // commit
+ Option<HoodieInstant> earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits);
+ Set<HoodieInstant> acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet());
+ if (earliestRetainedCommit.isPresent()) {
+ acceptableCommits
+ .removeAll(activeTimeline.findInstantsInRange("000", earliestRetainedCommit.get().getTimestamp())
+ .getInstants().collect(Collectors.toSet()));
+ acceptableCommits.add(earliestRetainedCommit.get());
+ }
+
+ TableFileSystemView fsView = table1.getFileSystemView();
+ // Need to ensure the following
+ for (String partitionPath : dataGen.getPartitionPaths()) {
+ List<HoodieFileGroup> fileGroups = fsView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+ for (HoodieFileGroup fileGroup : fileGroups) {
+ Set<String> commitTimes = new HashSet<>();
+ fileGroup.getAllBaseFiles().forEach(value -> {
+ LOG.debug("Data File - " + value);
+ commitTimes.add(value.getCommitTime());
+ });
+ if (cfg.isAsyncClean()) {
+ commitTimes.remove(lastInstant.getTimestamp());
+ }
+ assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes,
+ "Only contain acceptable versions of file should be present");
+ }
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+ }
+ }
+}
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 424bb6c53e..d2286decfe 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -20,6 +20,7 @@ package org.apache.hudi.testutils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
@@ -39,15 +40,16 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
-import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
-import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -165,18 +167,18 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
return table;
}
- public void assertPartitionMetadataForRecords(List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
+ public void assertPartitionMetadataForRecords(String basePath, List<HoodieRecord> inputRecords, FileSystem fs) throws IOException {
Set<String> partitionPathSet = inputRecords.stream()
.map(HoodieRecord::getPartitionPath)
.collect(Collectors.toSet());
- assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+ assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs);
}
- public void assertPartitionMetadataForKeys(List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
+ public void assertPartitionMetadataForKeys(String basePath, List<HoodieKey> inputKeys, FileSystem fs) throws IOException {
Set<String> partitionPathSet = inputKeys.stream()
.map(HoodieKey::getPartitionPath)
.collect(Collectors.toSet());
- assertPartitionMetadata(partitionPathSet.stream().toArray(String[]::new), fs);
+ assertPartitionMetadata(basePath, partitionPathSet.stream().toArray(String[]::new), fs);
}
/**
@@ -186,7 +188,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
* @param fs File System
* @throws IOException in case of error
*/
- public void assertPartitionMetadata(String[] partitionPaths, FileSystem fs) throws IOException {
+ public static void assertPartitionMetadata(String basePath, String[] partitionPaths, FileSystem fs) throws IOException {
for (String partitionPath : partitionPaths) {
assertTrue(HoodiePartitionMetadata.hasPartitionMetadata(fs, new Path(basePath, partitionPath)));
HoodiePartitionMetadata pmeta = new HoodiePartitionMetadata(fs, new Path(basePath, partitionPath));
@@ -201,7 +203,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
* @param taggedRecords Tagged Records
* @param instantTime Commit Timestamp
*/
- public void checkTaggedRecords(List<HoodieRecord> taggedRecords, String instantTime) {
+ public static void checkTaggedRecords(List<HoodieRecord> taggedRecords, String instantTime) {
for (HoodieRecord rec : taggedRecords) {
assertTrue(rec.isCurrentLocationKnown(), "Record " + rec + " found with no location.");
assertEquals(rec.getCurrentLocation().getInstantTime(), instantTime,
@@ -214,7 +216,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
*
* @param records List of Hoodie records
*/
- public void assertNodupesWithinPartition(List<HoodieRecord<RawTripTestPayload>> records) {
+ public static void assertNodupesWithinPartition(List<HoodieRecord<RawTripTestPayload>> records) {
Map<String, Set<String>> partitionToKeys = new HashMap<>();
for (HoodieRecord r : records) {
String key = r.getRecordKey();
@@ -233,29 +235,46 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
* guaranteed by record-generation function itself.
*
* @param writeConfig Hoodie Write Config
- * @param recordGenFunction Records Generation function
+ * @param recordsGenFunction Records Generation function
* @return Wrapped function
*/
- private Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
- final HoodieWriteConfig writeConfig, final Function2<List<HoodieRecord>, String, Integer> recordGenFunction) {
+ public static Function2<List<HoodieRecord>, String, Integer> wrapRecordsGenFunctionForPreppedCalls(
+ final String basePath,
+ final Configuration hadoopConf,
+ final HoodieSparkEngineContext context,
+ final HoodieWriteConfig writeConfig,
+ final Function2<List<HoodieRecord>, String, Integer> recordsGenFunction) {
return (commit, numRecords) -> {
final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
- List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords);
+ List<HoodieRecord> records = recordsGenFunction.apply(commit, numRecords);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
- JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
+ JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table);
return taggedRecords.collect();
};
}
- private Function3<List<HoodieRecord>, String, Integer, String> wrapRecordsGenFunctionForPreppedCalls(
- final HoodieWriteConfig writeConfig, final Function3<List<HoodieRecord>, String, Integer, String> recordGenFunction) {
+ /**
+ * Helper to generate records generation function for testing Prepped version of API. Prepped APIs expect the records
+ * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
+ * guaranteed by record-generation function itself.
+ *
+ * @param writeConfig Hoodie Write Config
+ * @param recordsGenFunction Records Generation function (for partition)
+ * @return Wrapped function
+ */
+ public static Function3<List<HoodieRecord>, String, Integer, String> wrapPartitionRecordsGenFunctionForPreppedCalls(
+ final String basePath,
+ final Configuration hadoopConf,
+ final HoodieSparkEngineContext context,
+ final HoodieWriteConfig writeConfig,
+ final Function3<List<HoodieRecord>, String, Integer, String> recordsGenFunction) {
return (commit, numRecords, partition) -> {
final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
- List<HoodieRecord> records = recordGenFunction.apply(commit, numRecords, partition);
+ List<HoodieRecord> records = recordsGenFunction.apply(commit, numRecords, partition);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
- JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, jsc.parallelize(records, 1), table);
+ JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, context.getJavaSparkContext().parallelize(records, 1), table);
return taggedRecords.collect();
};
}
@@ -269,16 +288,20 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
* @param keyGenFunction Keys Generation function
* @return Wrapped function
*/
- private Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
- final HoodieWriteConfig writeConfig, final Function<Integer, List<HoodieKey>> keyGenFunction) {
+ public static Function<Integer, List<HoodieKey>> wrapDeleteKeysGenFunctionForPreppedCalls(
+ final String basePath,
+ final Configuration hadoopConf,
+ final HoodieSparkEngineContext context,
+ final HoodieWriteConfig writeConfig,
+ final Function<Integer, List<HoodieKey>> keyGenFunction) {
return (numRecords) -> {
final HoodieIndex index = SparkHoodieIndexFactory.createIndex(writeConfig);
List<HoodieKey> records = keyGenFunction.apply(numRecords);
final HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build();
HoodieSparkTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
- JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+ JavaRDD<HoodieRecord> recordsToDelete = context.getJavaSparkContext().parallelize(records, 1)
.map(key -> new HoodieAvroRecord(key, new EmptyHoodieRecordPayload()));
- JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, recordsToDelete, table);
+ JavaRDD<HoodieRecord> taggedRecords = tagLocation(index, context, recordsToDelete, table);
return taggedRecords.map(record -> record.getKey()).collect();
};
}
@@ -295,16 +318,24 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
HoodieWriteConfig writeConfig,
Function2<List<HoodieRecord>, String, Integer> wrapped) {
if (isPreppedAPI) {
- return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+ return wrapRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
} else {
return wrapped;
}
}
+ /**
+ * Generate wrapper for record generation function for testing Prepped APIs.
+ *
+ * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+ * @param writeConfig Hoodie Write Config
+ * @param wrapped Actual Records Generation function (for partition)
+ * @return Wrapped Function
+ */
public Function3<List<HoodieRecord>, String, Integer, String> generateWrapRecordsForPartitionFn(boolean isPreppedAPI,
HoodieWriteConfig writeConfig, Function3<List<HoodieRecord>, String, Integer, String> wrapped) {
if (isPreppedAPI) {
- return wrapRecordsGenFunctionForPreppedCalls(writeConfig, wrapped);
+ return wrapPartitionRecordsGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
} else {
return wrapped;
}
@@ -321,7 +352,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
public Function<Integer, List<HoodieKey>> generateWrapDeleteKeysFn(boolean isPreppedAPI,
HoodieWriteConfig writeConfig, Function<Integer, List<HoodieKey>> wrapped) {
if (isPreppedAPI) {
- return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+ return wrapDeleteKeysGenFunctionForPreppedCalls(basePath, hadoopConf, context, writeConfig, wrapped);
} else {
return wrapped;
}
@@ -562,7 +593,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
client.commit(newCommitTime, result);
}
// check the partition metadata is written out
- assertPartitionMetadataForRecords(records, fs);
+ assertPartitionMetadataForRecords(basePath, records, fs);
// verify that there is a commit
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
@@ -634,7 +665,7 @@ public class HoodieClientTestBase extends HoodieClientTestHarness {
assertNoWriteErrors(statuses);
// check the partition metadata is written out
- assertPartitionMetadataForKeys(keysToDelete, fs);
+ assertPartitionMetadataForKeys(basePath, keysToDelete, fs);
// verify that there is a commit
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index e6a4d63e8c..a0c093be16 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -17,13 +17,6 @@
package org.apache.hudi.testutils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hudi.HoodieConversionUtils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -75,6 +68,14 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.timeline.service.TimelineService;
import org.apache.hudi.util.JFunction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
@@ -86,7 +87,6 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
-import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
@@ -105,6 +105,8 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import scala.Tuple2;
+
import static org.apache.hudi.common.util.CleanerUtils.convertCleanMetadata;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -510,12 +512,21 @@ public abstract class HoodieClientTestHarness extends HoodieCommonTestHarness im
return tableView;
}
+ /**
+ * @deprecated Use {@link #tagLocation(HoodieIndex, HoodieEngineContext, JavaRDD, HoodieTable)} instead.
+ */
+ @Deprecated
public JavaRDD<HoodieRecord> tagLocation(
HoodieIndex index, JavaRDD<HoodieRecord> records, HoodieTable table) {
return HoodieJavaRDD.getJavaRDD(
index.tagLocation(HoodieJavaRDD.of(records), context, table));
}
+ public static JavaRDD<HoodieRecord> tagLocation(
+ HoodieIndex index, HoodieEngineContext context, JavaRDD<HoodieRecord> records, HoodieTable table) {
+ return HoodieJavaRDD.getJavaRDD(index.tagLocation(HoodieJavaRDD.of(records), context, table));
+ }
+
public static Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord> inputRecordsRDD) {
HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
WorkloadStat globalStat = new WorkloadStat();