You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/14 17:52:25 UTC
[gobblin] branch master updated: [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 585298fb5 [GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)
585298fb5 is described below
commit 585298fb5ebc074f69c1b9db87de6186c4855b26
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Fri Oct 14 10:52:18 2022 -0700
[GOBBLIN-1707] Enhance `IcebergDataset` to detect when files already at dest then proceed with only delta (#3575)
* Enhance `IcebergDataset` to detect when files already at dest and proceed with only delta
* fixup: minor doc/comment mods
* Allow not finding a file mentioned by iceberg metadata to be a non-fatal error
* Have `IcebergDataset` check more thoroughly at dest for previously-copied files that could be skipped
* Add short-circuiting of `IcebergDataset` file scan when (root) metadata file already replicated to dest
* Abreviate `IcebergDataset` logging when source file not found, refactor for code reuse, and round out testing
* Improve javadoc as suggested during review
* Streamline `IcebergDataset` logging when source path not found
* Skip `IcebergDataset` per-data-file check, falling back to per-manifest-file checking
* minor comment change
* Add `IcebergDataset` logging to indicate volume of filepaths accumulated
* Extend and refactor `IcebergDataset` logging to indicate volume of filepaths accumulated
* improve comments
* Add logging for running count of source paths not found
---
.../management/copy/iceberg/IcebergDataset.java | 160 +++++++++++++--
.../copy/iceberg/IcebergSnapshotInfo.java | 8 +-
.../data/management/copy/iceberg/IcebergTable.java | 37 +++-
.../copy/iceberg/IcebergDatasetTest.java | 227 ++++++++++++++++++---
.../service/monitoring/FsJobStatusRetriever.java | 2 +-
.../util/function/CheckedExceptionFunction.java | 44 +++-
.../util/measurement/GrowthMilestoneTracker.java | 64 ++++++
7 files changed, 475 insertions(+), 67 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 1886689a1..006e9c1c6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -17,9 +17,11 @@
package org.apache.gobblin.data.management.copy.iceberg;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -27,9 +29,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.function.Function;
+import javax.annotation.concurrent.NotThreadSafe;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -61,6 +68,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
private final IcebergTable icebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
+ private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired
private final Optional<URI> sourceCatalogMetastoreURI;
private final Optional<URI> targetCatalogMetastoreURI;
@@ -127,15 +135,14 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
- Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
- log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
+ Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus(targetFs, copyConfig);
+ log.info("~{}.{}~ found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
Configuration defaultHadoopConfiguration = new Configuration();
for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
Path srcPath = entry.getKey();
FileStatus srcFileStatus = entry.getValue();
- // TODO: determine whether unnecessarily expensive to repeatedly re-create what should be the same FS: could it
- // instead be created once and reused thereafter?
+ // TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
// TODO: Add preservation of ancestor ownership and permissions!
@@ -149,34 +156,151 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
- log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
+ log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}
/**
* Finds all files of the Iceberg's current snapshot
- * Returns a map of path, file status for each file that needs to be copied
+ * @return a map of path, file status for each file that needs to be copied
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
- Map<Path, FileStatus> result = Maps.newHashMap();
+ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
IcebergTable icebergTable = this.getIcebergTable();
+ /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
+ Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
+ // omit considering timestamp (or other markers of freshness), as files should be immutable
+ // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
+ copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
+ );
+
+ // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
+ IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
+ if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
+ isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
+ log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
+ dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+ currentSnapshotOverview.getManifestListPath(),
+ currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
+ return Maps.newHashMap();
+ }
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
- // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
- log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
- snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
- return snapshotInfo.getAllPaths().iterator();
+ // log each snapshot, for context, in case of `FileNotFoundException` during `FileSystem.getFileStatus()`
+ String manListPath = snapshotInfo.getManifestListPath();
+ log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", dbName, inputTableName,
+ snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+ // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
+ // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend
+ // also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
+ // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
+ // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
+ // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply
+ // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
+ // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least
+ // some of the children pointed to within could have been copied prior, when they previously appeared as a
+ // child of the current file's predecessor (which this new meta file now replaces).
+ if (!isPresentOnTarget.apply(manListPath)) {
+ List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+ for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+ if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
+ missingPaths.add(mfi.getManifestFilePath());
+ // being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
+ // skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
+ // during a period where replication fell so far behind that no snapshots listed among current metadata
+ // are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and
+ // compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
+ // file count may run into 100k+ (even beyond!)
+ missingPaths.addAll(mfi.getListedFilePaths());
+ }
+ }
+ log.info("~{}.{}~ snapshot '{}': collected {} additional source paths",
+ dbName, inputTableName, snapshotInfo.getSnapshotId(), missingPaths.size());
+ return missingPaths.iterator();
+ } else {
+ log.info("~{}.{}~ snapshot '{}' already present on target... skipping (including contents)",
+ dbName, inputTableName, snapshotInfo.getSnapshotId());
+ // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing mf-list
+ Optional<String> metadataPath = snapshotInfo.getMetadataPath();
+ Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
+ metadataPath.ifPresent(ignore ->
+ log.info("~{}.{}~ metadata IS {} already present on target", dbName, inputTableName,
+ nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+ );
+ return nonReplicatedMetadataPath.map(p -> Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+ }
})
);
+
+ Map<Path, FileStatus> results = Maps.newHashMap();
+ long numSourceFilesNotFound = 0L;
Iterable<String> filePathsIterable = () -> filePathsIterator;
- // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
- // likely benefit from parallelism
- for (String pathString : filePathsIterable) {
- Path path = new Path(pathString);
- result.put(path, this.sourceFs.getFileStatus(path));
+ try {
+ // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus()` network calls likely
+ // to benefit from parallelism
+ GrowthMilestoneTracker growthTracker = new GrowthMilestoneTracker();
+ PathErrorConsolidator errorConsolidator = new PathErrorConsolidator();
+ for (String pathString : filePathsIterable) {
+ Path path = new Path(pathString);
+ try {
+ results.put(path, this.sourceFs.getFileStatus(path));
+ if (growthTracker.isAnotherMilestone(results.size())) {
+ log.info("~{}.{}~ collected file status on '{}' source paths", dbName, inputTableName, results.size());
+ }
+ } catch (FileNotFoundException fnfe) {
+ if (!shouldTolerateMissingSourceFiles) {
+ throw fnfe;
+ } else {
+ // log, but otherwise swallow... to continue on
+ String total = ++numSourceFilesNotFound + " total";
+ String speculation = "either premature deletion broke time-travel or metadata read interleaved among delete";
+ errorConsolidator.prepLogMsg(path).ifPresent(msg ->
+ log.warn("~{}.{}~ source {} ({}... {})", dbName, inputTableName, msg, speculation, total)
+ );
+ }
+ }
+ }
+ } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
+ wrapper.rethrowWrapped();
+ }
+ return results;
+ }
+
+ /**
+ * Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy.
+ * OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level:
+ * 1. on the first path within the dir, log that specific path
+ * 2. on the second path within the dir, log the dir path as a summarization (with ellipsis)
+ * 3. thereafter, skip, logging nothing
+ * The directory, parent path is the default consolidation strategy, yet may be overridden.
+ */
+ @NotThreadSafe
+ protected static class PathErrorConsolidator {
+ private final Map<Path, Boolean> consolidatedPathToWhetherErrorLogged = Maps.newHashMap();
+
+ /** @return consolidated message to log, iff appropriate; else `Optional.empty()` when deserves inhibition */
+ public Optional<String> prepLogMsg(Path path) {
+ Path consolidatedPath = calcPathConsolidation(path);
+ Boolean hadAlreadyLoggedConsolidation = this.consolidatedPathToWhetherErrorLogged.get(consolidatedPath);
+ if (!Boolean.valueOf(true).equals(hadAlreadyLoggedConsolidation)) {
+ boolean shouldLogConsolidationNow = hadAlreadyLoggedConsolidation != null;
+ consolidatedPathToWhetherErrorLogged.put(consolidatedPath, shouldLogConsolidationNow);
+ String pathLogString = shouldLogConsolidationNow ? (consolidatedPath.toString() + "/...") : path.toString();
+ return Optional.of("path" + (shouldLogConsolidationNow ? "s" : " ") + " not found: '" + pathLogString + "'");
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ /** @return a {@link Path} to consolidate around; default is: {@link Path#getParent()} */
+ protected Path calcPathConsolidation(Path path) {
+ return path.getParent();
}
- return result;
+ }
+
+ @VisibleForTesting
+ static PathErrorConsolidator createPathErrorConsolidator() {
+ return new PathErrorConsolidator();
}
/** Add layer of indirection to permit test mocking by working around `FileSystem.get()` `static` method */
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index b4214017f..42b871302 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -56,9 +56,15 @@ public class IcebergSnapshotInfo {
return manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
}
- public List<String> getAllPaths() {
+ /** @return the `manifestListPath` and `metadataPath`, if present */
+ public List<String> getSnapshotApexPaths() {
List<String> result = metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
result.add(manifestListPath);
+ return result;
+ }
+
+ public List<String> getAllPaths() {
+ List<String> result = getSnapshotApexPaths();
result.addAll(getManifestFilePaths());
result.addAll(getAllDataFilePaths());
return result;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index a9fbe05cc..fbd924845 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -72,6 +72,12 @@ public class IcebergTable {
return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()));
}
+ /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */
+ public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException {
+ TableMetadata current = accessTableMetadata();
+ return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), true);
+ }
+
/** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
TableMetadata current = accessTableMetadata();
@@ -90,23 +96,31 @@ public class IcebergTable {
}
/**
- * @return metadata info for all known snapshots, but incrementally, so content overlap between snapshots appears
- * only within the first as they're ordered historically, with *most recent last*
+ * @return metadata info for all known snapshots, but incrementally, so overlapping entries within snapshots appear
+ * only with the first as they're ordered historically, with *most recent last*.
+ *
+ * This means the {@link IcebergSnapshotInfo#getManifestFiles()} for the (n+1)-th element of the iterator will omit
+ * all manifest files and listed data files, already reflected in a {@link IcebergSnapshotInfo#getManifestFiles()}
+ * from the n-th or prior elements. Given the order of the {@link Iterator<IcebergSnapshotInfo>} returned, this
+ * mirrors the snapshot-to-file dependencies: each file is returned exactly once with the (oldest) snapshot from
+ * which it first becomes reachable.
+ *
+ * Only the final {@link IcebergSnapshotInfo#getMetadataPath()} is present (for the snapshot it itself deems current).
*/
public Iterator<IcebergSnapshotInfo> getIncrementalSnapshotInfosIterator() throws IOException {
// TODO: investigate using `.addedFiles()`, `.deletedFiles()` to calc this
- Set<String> knownManifestListFilePaths = Sets.newHashSet();
- Set<String> knownManifestFilePaths = Sets.newHashSet();
- Set<String> knownListedFilePaths = Sets.newHashSet();
+ Set<String> knownFilePaths = Sets.newHashSet(); // as absolute paths are clearly unique, use a single set for all
return Iterators.filter(Iterators.transform(getAllSnapshotInfosIterator(), snapshotInfo -> {
- if (false == knownManifestListFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
+ log.info("~{}~ before snapshot '{}' - '{}' total known iceberg paths",
+ tableId, snapshotInfo.getSnapshotId(), knownFilePaths.size());
+ if (false == knownFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
return snapshotInfo.toBuilder().manifestListPath(null).build(); // use `null` as marker to surrounding `filter`
}
List<IcebergSnapshotInfo.ManifestFileInfo> novelManifestInfos = Lists.newArrayList();
for (ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
- if (true == knownManifestFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
+ if (true == knownFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
List<String> novelListedPaths = mfi.getListedFilePaths().stream()
- .filter(fpath -> true == knownListedFilePaths.add(fpath)) // heretofore unknown
+ .filter(fpath -> true == knownFilePaths.add(fpath)) // heretofore unknown
.collect(Collectors.toList());
if (novelListedPaths.size() == mfi.getListedFilePaths().size()) { // nothing filtered
novelManifestInfos.add(mfi); // reuse orig
@@ -130,15 +144,18 @@ public class IcebergTable {
}
protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation) throws IOException {
+ return createSnapshotInfo(snapshot, metadataFileLocation, false);
+ }
+
+ protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation, boolean skipManifestFileInfo) throws IOException {
// TODO: verify correctness, even when handling 'delete manifests'!
- List<ManifestFile> manifests = snapshot.allManifests();
return new IcebergSnapshotInfo(
snapshot.snapshotId(),
Instant.ofEpochMilli(snapshot.timestampMillis()),
metadataFileLocation,
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
- calcAllManifestFileInfos(manifests, tableOps.io())
+ skipManifestFileInfo ? Lists.newArrayList() : calcAllManifestFileInfos(snapshot.allManifests(), tableOps.io())
);
}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 61aaf6851..159eca34c 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -25,15 +25,16 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
@@ -79,7 +80,7 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a";
private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
- private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockedIcebergTable.SnapshotPaths(
+ private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockIcebergTable.SnapshotPaths(
Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B)))
@@ -88,7 +89,7 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_1 = MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
private static final String MANIFEST_DATA_PATH_1A = MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
private static final String MANIFEST_DATA_PATH_1B = MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
- private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockedIcebergTable.SnapshotPaths(
+ private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockIcebergTable.SnapshotPaths(
Optional.empty(), MANIFEST_LIST_PATH_1,
Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))
@@ -104,23 +105,109 @@ public class IcebergDatasetTest {
}
@Test
- public void testGetFilePaths() throws IOException {
- IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
- IcebergSnapshotInfo icebergSnapshotInfo = SNAPSHOT_PATHS_0.asSnapshotInfo();
- Mockito.when(icebergTable.getIncrementalSnapshotInfosIterator()).thenReturn(Arrays.asList(icebergSnapshotInfo).iterator());
+ public void testGetFilePathsWhenDestEmpty() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList();
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ }
- FileSystem sourceFs = Mockito.mock(FileSystem.class);
- IcebergDataset icebergDataset = new IcebergDataset("test_db_name", "test_tbl_name", icebergTable, new Properties(), sourceFs);
+ @Test
+ public void testGetFilePathsWhenOneManifestListAtDest() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ }
- Set<Path> expectedPaths = Sets.newHashSet();
- for (String p : icebergSnapshotInfo.getAllPaths()) {
- expectedPaths.add(new Path(p));
- }
+ @Test
+ public void testGetFilePathsWhenOneManifestAtDest() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_PATH_1);
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+ expectedResultPaths.add(new Path(MANIFEST_LIST_PATH_1)); // expect manifest's parent, despite manifest subtree skip
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ }
+
+ @Test
+ public void testGetFilePathsWhenSomeDataFilesAtDest() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_DATA_PATH_1B, MANIFEST_DATA_PATH_0A);
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ // despite already existing on target, expect anyway: per-file check skipped for optimization's sake
+ // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_1B));
+ // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_0A));
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ }
+
+ @Test
+ public void testGetFilePathsWillSkipMissingSourceFile() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ // pretend this path doesn't exist on source:
+ Path missingPath = new Path(MANIFEST_DATA_PATH_0A);
+ Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ existingSourcePaths.remove(missingPath);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), SNAPSHOT_PATHS_0);
+ expectedResultPaths.remove(missingPath);
+ validateGetFilePathsGivenDestState(
+ icebergSnapshots,
+ Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
+ existingDestPaths,
+ expectedResultPaths);
+ }
+
+ @Test
+ public void testGetFilePathsWhenManifestListsAtDestButNotMetadata() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+ Set<Path> expectedResultPaths = Sets.newHashSet();
+ expectedResultPaths.add(new Path(METADATA_PATH));
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ }
- Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus();
- Assert.assertEquals(filePathsToFileStatus.keySet(), expectedPaths);
- // verify all values `null` (because `sourceFs.getFileStatus` not mocked)
- Assert.assertEquals(Sets.newHashSet(filePathsToFileStatus.values()), new HashSet<>(Arrays.asList(new FileStatus[] { null })));
+ @Test
+ public void testGetFilePathsWhenAllAtDest() throws IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots = Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH, MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+ Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any delta
+ IcebergTable mockTable = validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, expectedResultPaths);
+ // ensure short-circuiting was able to avert iceberg manifests scan
+ Mockito.verify(mockTable, Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
+ Mockito.verifyNoMoreInteractions(mockTable);
+ }
+
+ /** Exception wrapping is used internally--ensure that doesn't lapse into silently swallowing errors */
+ @Test(expectedExceptions = IOException.class)
+ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOException {
+ IcebergTable icebergTable = MockIcebergTable.withSnapshots(Lists.newArrayList(SNAPSHOT_PATHS_0));
+
+ MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI);
+ FileSystem sourceFs = sourceFsBuilder.build();
+ IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
+
+ MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ FileSystem destFs = destFsBuilder.build();
+ Mockito.doThrow(new IOException("Ha - not so fast!")).when(destFs).getFileStatus(new Path(SNAPSHOT_PATHS_0.manifestListPath));
+
+ CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
+ icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+ }
+
+ /** Validate error consolidation used to streamline logging. */
+ @Test
+ public void testPathErrorConsolidator() {
+ IcebergDataset.PathErrorConsolidator pec = IcebergDataset.createPathErrorConsolidator();
+ Optional<String> msg0 = pec.prepLogMsg(new Path("/a/b/c/file0"));
+ Assert.assertTrue(msg0.isPresent());
+ Assert.assertEquals(msg0.get(), "path not found: '/a/b/c/file0'");
+ Optional<String> msg1 = pec.prepLogMsg(new Path("/a/b/c/file1"));
+ Assert.assertTrue(msg1.isPresent());
+ Assert.assertEquals(msg1.get(), "paths not found: '/a/b/c/...'");
+ Optional<String> msg2 = pec.prepLogMsg(new Path("/a/b/c/file2"));
+ Assert.assertFalse(msg2.isPresent());
+ Optional<String> msg3 = pec.prepLogMsg(new Path("/a/b/c-other/file0"));
+ Assert.assertTrue(msg3.isPresent());
}
/**
@@ -137,7 +224,7 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
@@ -162,7 +249,7 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
+ IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
@@ -176,7 +263,64 @@ public class IcebergDatasetTest {
verifyCopyEntities(copyEntities, expectedPaths);
}
- private void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
+ /**
+ * exercise {@link IcebergDataset::getFilePaths} and validate the result
+ * @return {@link IcebergTable} (mock!), for behavioral verification
+ */
+ protected IcebergTable validateGetFilePathsGivenDestState(
+ List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
+ List<String> existingDestPaths,
+ Set<Path> expectedResultPaths) throws IOException {
+ return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(),existingDestPaths, expectedResultPaths);
+ }
+
+ /**
+ * exercise {@link IcebergDataset::getFilePaths} and validate the result
+ * @return {@link IcebergTable} (mock!), for behavioral verification
+ */
+ protected IcebergTable validateGetFilePathsGivenDestState(
+ List<MockIcebergTable.SnapshotPaths> sourceSnapshotPathSets,
+ Optional<List<String>> optExistingSourcePaths,
+ List<String> existingDestPaths,
+ Set<Path> expectedResultPaths) throws IOException {
+ IcebergTable icebergTable = MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
+
+ MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
+ optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
+ FileSystem sourceFs = sourceFsBuilder.build();
+ IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs);
+
+ MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI);
+ destFsBuilder.addPaths(existingDestPaths);
+ FileSystem destFs = destFsBuilder.build();
+ CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
+
+ Map<Path, FileStatus> filePathsToFileStatus = icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+ Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
+ // verify solely the path portion of the `FileStatus`, since that's all mock sets up
+ Assert.assertEquals(
+ filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()),
+ expectedResultPaths);
+ return icebergTable;
+ }
+
+ /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */
+ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, MockIcebergTable.SnapshotPaths... snapshotDefs) {
+ Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
+ snapshotDef.asSnapshotInfo().getAllPaths().stream()
+ ).forEach(p ->
+ paths.add(new Path(p))
+ );
+ return paths;
+ }
+
+ private CopyConfiguration createEmptyCopyConfiguration(FileSystem fs) {
+ return CopyConfiguration.builder(fs, copyConfigProperties)
+ .copyContext(new CopyContext())
+ .build();
+ }
+
+ private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
List<String> actual = new ArrayList<>();
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
@@ -218,6 +362,7 @@ public class IcebergDatasetTest {
public MockFileSystemBuilder(URI fsURI) {
this(fsURI, false);
}
+
public MockFileSystemBuilder(URI fsURI, boolean shouldRepresentEveryPath) {
this.fsURI = fsURI;
this.optPaths = shouldRepresentEveryPath ? Optional.empty() : Optional.of(Sets.newHashSet());
@@ -256,6 +401,8 @@ public class IcebergDatasetTest {
Mockito.when(fs.getFileStatus(any(Path.class))).thenAnswer(invocation ->
createEmptyFileStatus(invocation.getArgumentAt(0, Path.class).toString()));
} else {
+ // WARNING: order is critical--specific paths *after* `any(Path)`; in addition, since mocking further
+ // an already-mocked instance, `.doReturn/.when` is needed (vs. `.when/.thenReturn`)
Mockito.when(fs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException());
for (Path p : this.optPaths.get()) {
Mockito.doReturn(createEmptyFileStatus(p.toString())).when(fs).getFileStatus(p);
@@ -273,7 +420,7 @@ public class IcebergDatasetTest {
}
- private static class MockedIcebergTable extends IcebergTable {
+ private static class MockIcebergTable {
@Data
public static class SnapshotPaths {
@@ -282,7 +429,12 @@ public class IcebergDatasetTest {
private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
public IcebergSnapshotInfo asSnapshotInfo() {
- return asSnapshotInfo(0L, Instant.ofEpochMilli(0L));
+ return asSnapshotInfo(0L);
+ }
+
+ /** @param snapshotIdIndex used both as snapshot ID and as snapshot (epoch) timestamp */
+ public IcebergSnapshotInfo asSnapshotInfo(long snapshotIdIndex) {
+ return asSnapshotInfo(snapshotIdIndex, Instant.ofEpochMilli(snapshotIdIndex));
}
public IcebergSnapshotInfo asSnapshotInfo(Long snapshotId, Instant timestamp) {
@@ -290,19 +442,26 @@ public class IcebergDatasetTest {
}
}
- private final List<SnapshotPaths> snapshotPathsList;
-
- public MockedIcebergTable(List<SnapshotPaths> snapshotPathsList) {
- super(null, null);
- this.snapshotPathsList = Lists.newCopyOnWriteArrayList(snapshotPathsList);
+ public static IcebergTable withSnapshots(List<SnapshotPaths> snapshotPathSets) throws IOException {
+ IcebergTable table = Mockito.mock(IcebergTable.class);
+ int lastIndex = snapshotPathSets.size() - 1;
+ Mockito.when(table.getCurrentSnapshotInfoOverviewOnly()).thenReturn(
+ snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));
+ // ADMISSION: this is strictly more analogous to `IcebergTable.getAllSnapshotInfosIterator()`, as it doesn't
+ // filter only the delta... nonetheless, it should work fine for the tests herein
+ Mockito.when(table.getIncrementalSnapshotInfosIterator()).thenReturn(
+ IndexingStreams.transformWithIndex(snapshotPathSets.stream(),
+ (pathSet, i) -> pathSet.asSnapshotInfo(i)).iterator());
+ return table;
}
+ }
- @Override
- public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() {
- List<IcebergSnapshotInfo> snapshotInfos = snapshotPathsList.stream()
- .map(SnapshotPaths::asSnapshotInfo)
- .collect(Collectors.toList());
- return snapshotInfos.iterator();
+ public static class IndexingStreams {
+ /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in scala */
+ public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, BiFunction<T, Integer, R> f) {
+ // given sketchy import, sequester for now within enclosing test class, rather than adding to `gobblin-utility`
+ return org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(
+ inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
}
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index 9b5c40362..613dc7f69 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -123,7 +123,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
try {
String storeNamePrefix = KafkaJobStatusMonitor.jobStatusStoreName(flowGroup, "");
List<String> storeNamesForFlowGroup = stateStore.getStoreNames(storeName -> storeName.startsWith(storeNamePrefix));
- List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapUnchecked(storeName ->
+ List<State> flowGroupExecutionsStates = storeNamesForFlowGroup.stream().flatMap(CheckedExceptionFunction.wrapToUnchecked(storeName ->
stateStore.getAll(storeName).stream()
)).collect(Collectors.toList());
return asFlowStatuses(groupByFlowExecutionAndRetainLatest(flowGroup, flowGroupExecutionsStates, countJobStatusesPerFlowName));
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
index 96c6ab45f..115a43d35 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java
@@ -17,18 +17,40 @@
package org.apache.gobblin.util.function;
+import java.io.IOException;
import java.util.function.Function;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
/**
- * Alternative to {@link java.util.function.Function} that handles wrapping one checked `Exception`.
+ * Alternative to {@link Function} that handles wrapping (or tunneling) a single checked {@link Exception} derived class.
* Inspired by: https://dzone.com/articles/how-to-handle-checked-exception-in-lambda-expressi
*/
@FunctionalInterface
public interface CheckedExceptionFunction<T, R, E extends Exception> {
+ /**
+ * Wrapper to tunnel {@link IOException} as an unchecked exception that would later be unwrapped via
+ * {@link WrappedIOException#rethrowWrapped()}. If no expectation of unwrapping, this wrapper may simply add
+ * unnecessar obfuscation: instead use {@link CheckedExceptionFunction#wrapToUnchecked(CheckedExceptionFunction)}
+ *
+ * BUMMER: specific {@link IOException} hard-coded because: "generic class may not extend 'java.lang.Throwable'"
+ */
+ @RequiredArgsConstructor
+ public static class WrappedIOException extends RuntimeException {
+ @Getter
+ private final IOException wrappedException;
+
+ /** CAUTION: if this be your intent, DO NOT FORGET! Being unchecked, the compiler WILL NOT remind you. */
+ public void rethrowWrapped() throws IOException {
+ throw wrappedException;
+ }
+ }
+
R apply(T arg) throws E;
- /** @return a `Function` that will invoke {@link #apply} and catch any instance of Exception, `E`, rethrowing it wrapped as {@link RuntimeException}. */
- static <T, R, E extends Exception> Function<T, R> wrapUnchecked(CheckedExceptionFunction<T, R, E> f) {
+ /** @return {@link Function} proxy that catches any instance of {@link Exception} and rethrows it wrapped as {@link RuntimeException} */
+ static <T, R, E extends Exception> Function<T, R> wrapToUnchecked(CheckedExceptionFunction<T, R, E> f) {
return a -> {
try {
return f.apply(a);
@@ -39,4 +61,20 @@ public interface CheckedExceptionFunction<T, R, E extends Exception> {
}
};
}
+
+ /**
+ * @return {@link Function} proxy that catches any instance of {@link IOException}, and rethrows it wrapped as {@link WrappedIOException},
+ * for easy unwrapping via {@link WrappedIOException#rethrowWrapped()}
+ */
+ static <T, R, E extends IOException> Function<T, R> wrapToTunneled(CheckedExceptionFunction<T, R, E> f) {
+ return a -> {
+ try {
+ return f.apply(a);
+ } catch (RuntimeException re) {
+ throw re; // no double wrapping
+ } catch (IOException ioe) {
+ throw new WrappedIOException(ioe);
+ }
+ };
+ }
}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java
new file mode 100644
index 000000000..99dbd82fb
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/measurement/GrowthMilestoneTracker.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.measurement;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.stream.LongStream;
+
+
+/** Stateful class to track growth/accumulation/"high watermark" against milestones */
+public class GrowthMilestoneTracker {
+ private final Iterator<Long> milestoneSequence = createMilestoneSequence();
+ private Long nextMilestone = milestoneSequence.next();
+
+ /** @return whether `n >=` the next monotonically increasing milestone (with no effort to handle wrap-around) */
+ public final boolean isAnotherMilestone(long n) {
+ return this.calcLargestNewMilestone(n).isPresent();
+ }
+
+ /** @return largest monotonically increasing milestone iff `n >=` some new one (no effort to handle wrap-around) */
+ public final Optional<Long> calcLargestNewMilestone(long n) {
+ if (n < this.nextMilestone) {
+ return Optional.empty();
+ }
+ Long largestMilestoneAchieved;
+ do {
+ largestMilestoneAchieved = this.nextMilestone;
+ this.nextMilestone = this.milestoneSequence.hasNext() ? this.milestoneSequence.next() : Long.MAX_VALUE;
+ } while (n >= this.nextMilestone);
+ return Optional.of(largestMilestoneAchieved);
+ }
+
+ /**
+ * @return positive monotonically increasing milestones, for {@link GrowthMilestoneTracker#isAnotherMilestone(long)}
+ * to track against; if/whenever exhausted, {@link Long#MAX_VALUE} becomes stand-in thereafter
+ * DEFAULT SEQ: [1, 10, 100, 1000, 10k, 15k, 20k, 25k, 30k, ..., 50k, 75k, 100k, 125k, ..., 250k, 300k, 350k, ... )
+ */
+ protected Iterator<Long> createMilestoneSequence() {
+ LongStream initially = LongStream.iterate(1L, i -> i * 10).limit((long) Math.log10(1000));
+ LongStream next = LongStream.rangeClosed(2L, 9L).map(i -> i * 5000); // 10k - 45k
+ LongStream then = LongStream.rangeClosed(2L, 9L).map(i -> i * 25000); // 50k - 225k
+ LongStream thereafter = LongStream.iterate(250000L, i -> i + 50000);
+ return
+ LongStream.concat(initially,
+ LongStream.concat(next,
+ LongStream.concat(then, thereafter))
+ ).iterator();
+ }
+}