You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/01 03:10:39 UTC
[hudi] branch master updated: [HUDI-1054] Several performance fixes
during finalizing writes (#1768)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e79fbc0 [HUDI-1054] Several performance fixes during finalizing writes (#1768)
e79fbc0 is described below
commit e79fbc07fe803bc51cdf4f11948b133bbaa70595
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Fri Jul 31 20:10:28 2020 -0700
[HUDI-1054] Several performance fixes during finalizing writes (#1768)
Co-authored-by: Udit Mehrotra <ud...@amazon.com>
---
.../cli/commands/TestArchivedCommitsCommand.java | 2 +-
.../hudi/cli/commands/TestCommitsCommand.java | 2 +-
.../org/apache/hudi/client/HoodieWriteClient.java | 4 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 13 +++
.../java/org/apache/hudi/table/HoodieTable.java | 17 ++--
.../hudi/table/HoodieTimelineArchiveLog.java | 13 +--
.../java/org/apache/hudi/table/MarkerFiles.java | 100 ++++++++++++++++-----
.../rollback/BaseRollbackActionExecutor.java | 2 +-
.../hudi/io/TestHoodieTimelineArchiveLog.java | 14 +--
.../org/apache/hudi/table/TestMarkerFiles.java | 16 +++-
10 files changed, 130 insertions(+), 53 deletions(-)
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
index 313c1bc..4c7ce88 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java
@@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
// archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
- archiveLog.archiveIfRequired();
+ archiveLog.archiveIfRequired(jsc);
}
@AfterEach
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 45c340d..44e2b80 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -176,7 +176,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
// archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
- archiveLog.archiveIfRequired();
+ archiveLog.archiveIfRequired(jsc);
CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess());
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index b2ad315..9782b46 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -337,7 +337,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
try {
// Delete the marker directory for the instant.
- new MarkerFiles(table, instantTime).quietDeleteMarkerDir();
+ new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
// Do an inline compaction if enabled
if (config.isInlineCompaction()) {
@@ -349,7 +349,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
// We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
- archiveLog.archiveIfRequired();
+ archiveLog.archiveIfRequired(jsc);
autoCleanOnCommit(instantTime);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9aecdf7..69758f2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+ public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism";
+ public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100";
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString();
@@ -235,6 +237,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
}
+ public int getMarkersDeleteParallelism() {
+ return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM));
+ }
+
public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
}
@@ -830,6 +836,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withMarkersDeleteParallelism(int parallelism) {
+ props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism));
+ return this;
+ }
+
public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
return this;
@@ -874,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
DEFAULT_FINALIZE_WRITE_PARALLELISM);
+ setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM,
+ DEFAULT_MARKERS_DELETE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 748091e..d8b0c6e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -70,6 +70,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -428,21 +429,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
}
// we are not including log appends here, since they are already fail-safe.
- List<String> invalidDataPaths = markers.createdAndMergedDataPaths();
- List<String> validDataPaths = stats.stream()
+ Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism());
+ Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension()))
- .collect(Collectors.toList());
+ .collect(Collectors.toSet());
+
// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
+
if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
- }
- Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
- .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
- .collect(Collectors.groupingBy(Pair::getKey));
+ Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
+ .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
+ .collect(Collectors.groupingBy(Pair::getKey));
- if (!invalidPathsByPartition.isEmpty()) {
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 98d3e05..4be00a3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -121,7 +122,7 @@ public class HoodieTimelineArchiveLog {
/**
* Check if commits need to be archived. If yes, archive commits.
*/
- public boolean archiveIfRequired() throws IOException {
+ public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
@@ -129,7 +130,7 @@ public class HoodieTimelineArchiveLog {
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive);
- archive(instantsToArchive);
+ archive(jsc, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
} else {
@@ -267,7 +268,7 @@ public class HoodieTimelineArchiveLog {
return success;
}
- public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
+ public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
try {
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
@@ -275,7 +276,7 @@ public class HoodieTimelineArchiveLog {
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
try {
- deleteAnyLeftOverMarkerFiles(hoodieInstant);
+ deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
@@ -293,9 +294,9 @@ public class HoodieTimelineArchiveLog {
}
}
- private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) {
+ private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) {
MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
- if (markerFiles.deleteMarkerDir()) {
+ if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) {
LOG.info("Cleaned up left over marker directory for instant :" + instant);
}
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
index 00eb7df..8a310fd 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java
@@ -18,8 +18,12 @@
package org.apache.hudi.table;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
@@ -28,26 +32,27 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOType;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
import java.util.ArrayList;
-import java.util.LinkedList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Operates on marker files for a given write action (commit, delta commit, compaction).
*/
-public class MarkerFiles {
+public class MarkerFiles implements Serializable {
private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
- public static String stripMarkerSuffix(String path) {
- return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
- }
-
private final String instantTime;
- private final FileSystem fs;
- private final Path markerDirPath;
+ private final transient FileSystem fs;
+ private final transient Path markerDirPath;
private final String basePath;
public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
@@ -64,9 +69,9 @@ public class MarkerFiles {
instantTime);
}
- public void quietDeleteMarkerDir() {
+ public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try {
- deleteMarkerDir();
+ deleteMarkerDir(jsc, parallelism);
} catch (HoodieIOException ioe) {
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
}
@@ -74,34 +79,77 @@ public class MarkerFiles {
/**
* Delete Marker directory corresponding to an instant.
+ *
+ * @param jsc Java Spark Context.
+ * @param parallelism Spark parallelism for deletion.
*/
- public boolean deleteMarkerDir() {
+ public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try {
- boolean result = fs.delete(markerDirPath, true);
- if (result) {
+ if (fs.exists(markerDirPath)) {
+ FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
+ List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
+ .map(fileStatus -> fileStatus.getPath().toString())
+ .collect(Collectors.toList());
+
+ if (markerDirSubPaths.size() > 0) {
+ SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
+ parallelism = Math.min(markerDirSubPaths.size(), parallelism);
+ jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> {
+ Path subPath = new Path(subPathStr);
+ FileSystem fileSystem = subPath.getFileSystem(conf.get());
+ fileSystem.delete(subPath, true);
+ });
+ }
+
+ boolean result = fs.delete(markerDirPath, true);
LOG.info("Removing marker directory at " + markerDirPath);
- } else {
- LOG.info("No marker directory to delete at " + markerDirPath);
+ return result;
}
- return result;
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
+ return false;
}
public boolean doesMarkerDirExist() throws IOException {
return fs.exists(markerDirPath);
}
- public List<String> createdAndMergedDataPaths() throws IOException {
- List<String> dataFiles = new LinkedList<>();
- FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
- String pathStr = status.getPath().toString();
- if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
- dataFiles.add(translateMarkerToDataPath(pathStr));
+ public Set<String> createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException {
+ Set<String> dataFiles = new HashSet<>();
+
+ FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
+ List<String> subDirectories = new ArrayList<>();
+ for (FileStatus topLevelStatus: topLevelStatuses) {
+ if (topLevelStatus.isFile()) {
+ String pathStr = topLevelStatus.getPath().toString();
+ if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
+ dataFiles.add(translateMarkerToDataPath(pathStr));
+ }
+ } else {
+ subDirectories.add(topLevelStatus.getPath().toString());
}
- return true;
- }, false);
+ }
+
+ if (subDirectories.size() > 0) {
+ parallelism = Math.min(subDirectories.size(), parallelism);
+ SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
+ dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> {
+ Path path = new Path(directory);
+ FileSystem fileSystem = path.getFileSystem(serializedConf.get());
+ RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
+ List<String> result = new ArrayList<>();
+ while (itr.hasNext()) {
+ FileStatus status = itr.next();
+ String pathStr = status.getPath().toString();
+ if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
+ result.add(translateMarkerToDataPath(pathStr));
+ }
+ }
+ return result.iterator();
+ }).collect());
+ }
+
return dataFiles;
}
@@ -110,6 +158,10 @@ public class MarkerFiles {
return MarkerFiles.stripMarkerSuffix(rPath);
}
+ public static String stripMarkerSuffix(String path) {
+ return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
+ }
+
public List<String> allMarkerFilePaths() throws IOException {
List<String> markerFiles = new ArrayList<>();
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 846e8a8..90b9bb3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -113,7 +113,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
}
// Finally, remove the marker files post rollback.
- new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir();
+ new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
return rollbackMetadata;
}
diff --git a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 484caf7..5785fc8 100644
--- a/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ b/hudi-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -79,7 +79,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
.withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
- boolean result = archiveLog.archiveIfRequired();
+ boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
}
@@ -157,7 +157,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
- assertTrue(archiveLog.archiveIfRequired());
+ assertTrue(archiveLog.archiveIfRequired(jsc));
// reload the timeline and remove the remaining commits
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -246,7 +246,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
- boolean result = archiveLog.archiveIfRequired();
+ boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
@@ -289,7 +289,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
- boolean result = archiveLog.archiveIfRequired();
+ boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
@@ -315,7 +315,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
- assertTrue(archiveLog.archiveIfRequired());
+ assertTrue(archiveLog.archiveIfRequired(jsc));
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(5, timeline.countInstants(),
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -349,7 +349,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
- boolean result = archiveLog.archiveIfRequired();
+ boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
@@ -397,7 +397,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
- boolean result = archiveLog.archiveIfRequired();
+ boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
index 723d9e1..af679ce 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java
@@ -28,6 +28,9 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType;
+import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
private MarkerFiles markerFiles;
private FileSystem fs;
private Path markerFolderPath;
+ private JavaSparkContext jsc;
@BeforeEach
public void setup() throws IOException {
initPath();
initMetaClient();
+ this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName()));
this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000"));
this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
}
+ @AfterEach
+ public void cleanup() {
+ jsc.stop();
+ }
+
private void createSomeMarkerFiles() {
markerFiles.create("2020/06/01", "file1", IOType.MERGE);
markerFiles.create("2020/06/02", "file2", IOType.APPEND);
@@ -97,7 +107,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
// then
assertTrue(markerFiles.doesMarkerDirExist());
- assertTrue(markerFiles.deleteMarkerDir());
+ assertTrue(markerFiles.deleteMarkerDir(jsc, 2));
assertFalse(markerFiles.doesMarkerDirExist());
}
@@ -105,7 +115,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
public void testDeletionWhenMarkerDirNotExists() throws IOException {
// then
assertFalse(markerFiles.doesMarkerDirExist());
- assertFalse(markerFiles.deleteMarkerDir());
+ assertFalse(markerFiles.deleteMarkerDir(jsc, 2));
}
@Test
@@ -120,7 +130,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
// then
assertIterableEquals(CollectionUtils.createImmutableList(
"2020/06/01/file1", "2020/06/03/file3"),
- markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList())
+ markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList())
);
}