You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/09/27 01:40:02 UTC
[gobblin] branch master updated: [GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e385ea46b [GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)
e385ea46b is described below
commit e385ea46b078175c443d10e8c97dd1840d5f46d8
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Mon Sep 26 18:39:56 2022 -0700
[GOBBLIN-1707] Update `IcebergDataset` to incorporate all snapshots, not only the current one (#3569)
* Add `IcebergTableTest` unit test
* Fixup comment and indentation
* Minor correction of `Long` => `Integer`
* Correct comment
* [GOBBLIN-1711] Replace Jcenter with maven central (#3566)
* Minor rename of local var
* Extend `IcebergTable` to collect Iceberg metadata across all snapshots
* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding (#3549)
* address comments
* use connectionmanager when httpclient is not cloesable
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do message forwarding
* fix compilation error
* address comments
* address comments
* address comments
* update outdated javadoc
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
* [GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg (#3560)
* initial commit for iceberg distcp.
* adding copy entity helper and icerbeg distcp template and test case.
* Adding unit tests and refactoring method definitions for an Iceberg dataset.
* resolve conflicts after cleaning history
* update iceberg dataset and finder to include javadoc
* addressed comments on PR and aligned code check style
* renamed vars, added logging and updated javadoc
* update dataset descriptor with ternary operation and rename fs to sourceFs
* added source and target fs and update iceberg dataset finder constructor
* Update source and dest dataset methods as protected and add req args constructor
* change the order of attributes for iceberg dataset finder ctor
* update iceberg dataset methods with correct source and target fs
Co-authored-by: Meeth Gala <mg...@linkedin.com>
* Update `IcebergDataset` to use `IcebergTable.getIncrementalSnapshotInfosIterator` rather than `.getCurrentSnapshotInfo`
* Augment `IcebergDatasetTest` unit test to exercise mult-snapshot icebergs
* Minor javadoc Update
* Throw `IcebergTable.TableNotFoundException` when no such table found
Co-authored-by: Matthew Ho <ho...@gmail.com>
Co-authored-by: Zihan Li <zi...@linkedin.com>
Co-authored-by: Zihan Li <zi...@zihli-mn2.linkedin.biz>
Co-authored-by: meethngala <me...@gmail.com>
Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
.../copy/iceberg/IcebergCatalogFactory.java | 9 +-
.../management/copy/iceberg/IcebergDataset.java | 86 ++++++-----
.../copy/iceberg/IcebergHiveCatalog.java | 3 +-
.../copy/iceberg/IcebergSnapshotInfo.java | 9 +-
.../data/management/copy/iceberg/IcebergTable.java | 101 +++++++++++--
.../copy/iceberg/IcebergDatasetTest.java | 159 ++++++++++++++-------
.../management/copy/iceberg/IcebergTableTest.java | 95 +++++++++++-
7 files changed, 352 insertions(+), 110 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
index 43dff9fc6..a3e8464c3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.data.management.copy.iceberg;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import com.google.common.collect.Maps;
/**
@@ -26,6 +28,9 @@ import org.apache.iceberg.hive.HiveCatalogs;
*/
public class IcebergCatalogFactory {
public static IcebergCatalog create(Configuration configuration) {
- return new IcebergHiveCatalog(HiveCatalogs.loadCatalog(configuration));
+ HiveCatalog hcat = new HiveCatalog();
+ hcat.setConf(configuration);
+ hcat.initialize("hive", Maps.newHashMap());
+ return new IcebergHiveCatalog(hcat);
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 8d905c8e6..ae6e1aaea 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -24,6 +24,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -33,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -64,8 +64,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
protected final Properties properties;
protected final FileSystem sourceFs;
- private final Optional<String> sourceMetastoreURI;
- private final Optional<String> targetMetastoreURI;
+ private final Optional<URI> sourceCatalogMetastoreURI;
+ private final Optional<URI> targetCatalogMetastoreURI;
/** Target metastore URI */
public static final String TARGET_METASTORE_URI_KEY =
@@ -79,10 +79,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
this.icebergTable = icebergTbl;
this.properties = properties;
this.sourceFs = sourceFs;
- this.sourceMetastoreURI =
- Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
- this.targetMetastoreURI =
- Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
+ this.sourceCatalogMetastoreURI = getAsOptionalURI(this.properties, IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY);
+ this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties, TARGET_METASTORE_URI_KEY);
}
/**
@@ -96,42 +94,49 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
@Override
public String datasetURN() {
- // TODO: verify!
- return this.dbName + "." + this.inputTableName;
+ return this.getFileSetId();
}
/**
* Finds all files read by the table and generates CopyableFiles.
- * For the specific semantics see {@link #getCopyEntities}.
+ * For the specific semantics see {@link #createFileSets}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration) {
- return getCopyEntities(targetFs, configuration);
+ return createFileSets(targetFs, configuration);
}
/**
* Finds all files read by the table and generates CopyableFiles.
- * For the specific semantics see {@link #getCopyEntities}.
+ * For the specific semantics see {@link #createFileSets}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration,
Comparator<FileSet<CopyEntity>> prioritizer, PushDownRequestor<FileSet<CopyEntity>> requestor) {
// TODO: Implement PushDownRequestor and priority based copy entity iteration
- return getCopyEntities(targetFs, configuration);
+ return createFileSets(targetFs, configuration);
+ }
+
+ /** @return unique ID for this dataset, usable as a {@link CopyEntity}.fileset, for atomic publication grouping */
+ protected String getFileSetId() {
+ return this.dbName + "." + this.inputTableName;
}
/**
- * Finds all files read by the table and generates {@link CopyEntity}s for duplicating the table.
+ * Generates {@link FileSet}s, being themselves able to generate {@link CopyEntity}s for all files, data and metadata,
+ * comprising the iceberg/table, so as to fully specify remaining table replication.
*/
- Iterator<FileSet<CopyEntity>> getCopyEntities(FileSystem targetFs, CopyConfiguration configuration) {
+ protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs, CopyConfiguration configuration) {
FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
- return Iterators.singletonIterator(fileSet); }
+ return Iterators.singletonIterator(fileSet);
+ }
/**
- * Finds all files read by the table file set and generates {@link CopyEntity}s for duplicating the table.
+ * Finds all files, data and metadata, as {@link CopyEntity}s that comprise the table and fully specify remaining
+ * table replication.
*/
@VisibleForTesting
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration configuration) throws IOException {
- String fileSet = this.getInputTableName();
+ String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());
@@ -177,36 +182,45 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
Map<Path, FileStatus> result = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
- IcebergSnapshotInfo icebergSnapshotInfo = icebergTable.getCurrentSnapshotInfo();
-
- log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
- icebergSnapshotInfo.getSnapshotId(), icebergSnapshotInfo.getMetadataPath());
- List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();
-
- for (String pathString : pathsToCopy) {
+ Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = icebergTable.getIncrementalSnapshotInfosIterator();
+ Iterator<String> filePathsIterator = Iterators.concat(
+ Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
+ // TODO: decide: is it too much to print for every snapshot--instead use `.debug`?
+ log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
+ snapshotInfo.getSnapshotId(), snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+ return snapshotInfo.getAllPaths().iterator();
+ })
+ );
+ Iterable<String> filePathsIterable = () -> filePathsIterator;
+ // TODO: investigate whether streaming initialization of `Map` preferable--`getFileStatus` network calls would
+ // likely benefit from parallelism
+ for (String pathString : filePathsIterable) {
Path path = new Path(pathString);
result.put(path, this.sourceFs.getFileStatus(path));
}
return result;
}
+ protected static Optional<URI> getAsOptionalURI(Properties props, String key) {
+ return Optional.ofNullable(props.getProperty(key)).map(URI::create);
+ }
+
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return getDatasetDescriptor(sourceMetastoreURI, sourceFs);
+ return getDatasetDescriptor(sourceCatalogMetastoreURI, sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return getDatasetDescriptor(targetMetastoreURI, targetFs);
+ return getDatasetDescriptor(targetCatalogMetastoreURI, targetFs);
}
@NotNull
- private DatasetDescriptor getDatasetDescriptor(Optional<String> stringMetastoreURI, FileSystem fs) {
- String currentTable = this.getDbName() + "." + this.getInputTableName();
-
- URI hiveMetastoreURI = stringMetastoreURI.isPresent() ? URI.create(stringMetastoreURI.get()) : null;
-
- DatasetDescriptor currentDataset =
- new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, hiveMetastoreURI, currentTable);
- currentDataset.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
- return currentDataset;
+ private DatasetDescriptor getDatasetDescriptor(Optional<URI> catalogMetastoreURI, FileSystem fs) {
+ DatasetDescriptor descriptor = new DatasetDescriptor(
+ DatasetConstants.PLATFORM_ICEBERG,
+ catalogMetastoreURI.orElse(null),
+ this.getFileSetId()
+ );
+ descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+ return descriptor;
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index d8ffdb799..0af012ec5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -35,6 +35,7 @@ public class IcebergHiveCatalog implements IcebergCatalog {
@Override
public IcebergTable openTable(String dbName, String tableName) {
- return new IcebergTable(hc.newTableOps(TableIdentifier.of(dbName, tableName)));
+ TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
+ return new IcebergTable(tableId, hc.newTableOps(tableId));
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index c51c1a27d..b4214017f 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.time.Instant;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
+import lombok.Builder;
import lombok.Data;
import com.google.common.collect.Lists;
@@ -29,6 +31,7 @@ import com.google.common.collect.Lists;
/**
* Information about the metadata file and data file paths of a single Iceberg Snapshot.
*/
+@Builder(toBuilder = true)
@Data
public class IcebergSnapshotInfo {
@@ -40,7 +43,8 @@ public class IcebergSnapshotInfo {
private final Long snapshotId;
private final Instant timestamp;
- private final String metadataPath;
+ /** only for the current snapshot, being whom the metadata file 'belongs to'; `isEmpty()` for all other snapshots */
+ private final Optional<String> metadataPath;
private final String manifestListPath;
private final List<ManifestFileInfo> manifestFiles;
@@ -53,7 +57,8 @@ public class IcebergSnapshotInfo {
}
public List<String> getAllPaths() {
- List<String> result = Lists.newArrayList(metadataPath, manifestListPath);
+ List<String> result = metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
+ result.add(manifestListPath);
result.addAll(getManifestFilePaths());
result.addAll(getAllDataFilePaths());
return result;
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index f6ff42698..a9fbe05cc 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -19,9 +19,14 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.time.Instant;
+import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.ManifestFile;
@@ -29,11 +34,13 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
@@ -44,24 +51,98 @@ import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInf
@Slf4j
@AllArgsConstructor
public class IcebergTable {
+
+ /** Indicate the table identified by `tableId` does not (or does no longer) exist in the catalog */
+ public static class TableNotFoundException extends IOException {
+ @Getter
+ private final TableIdentifier tableId; // stored purely for logging / diagnostics
+
+ public TableNotFoundException(TableIdentifier tableId) {
+ super("Not found: '" + tableId + "'");
+ this.tableId = tableId;
+ }
+ }
+
+ private final TableIdentifier tableId;
private final TableOperations tableOps;
+ /** @return metadata info limited to the most recent (current) snapshot */
public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
- TableMetadata current = tableOps.current();
- Snapshot snapshot = current.currentSnapshot();
+ TableMetadata current = accessTableMetadata();
+ return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()));
+ }
+
+ /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */
+ public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() throws IOException {
+ TableMetadata current = accessTableMetadata();
+ long currentSnapshotId = current.currentSnapshot().snapshotId();
+ List<Snapshot> snapshots = current.snapshots();
+ return Iterators.transform(snapshots.iterator(), snapshot -> {
+ try {
+ return IcebergTable.this.createSnapshotInfo(
+ snapshot,
+ currentSnapshotId == snapshot.snapshotId() ? Optional.of(current.metadataFileLocation()) : Optional.empty()
+ );
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * @return metadata info for all known snapshots, but incrementally, so content overlap between snapshots appears
+ * only within the first as they're ordered historically, with *most recent last*
+ */
+ public Iterator<IcebergSnapshotInfo> getIncrementalSnapshotInfosIterator() throws IOException {
+ // TODO: investigate using `.addedFiles()`, `.deletedFiles()` to calc this
+ Set<String> knownManifestListFilePaths = Sets.newHashSet();
+ Set<String> knownManifestFilePaths = Sets.newHashSet();
+ Set<String> knownListedFilePaths = Sets.newHashSet();
+ return Iterators.filter(Iterators.transform(getAllSnapshotInfosIterator(), snapshotInfo -> {
+ if (false == knownManifestListFilePaths.add(snapshotInfo.getManifestListPath())) { // already known manifest list!
+ return snapshotInfo.toBuilder().manifestListPath(null).build(); // use `null` as marker to surrounding `filter`
+ }
+ List<IcebergSnapshotInfo.ManifestFileInfo> novelManifestInfos = Lists.newArrayList();
+ for (ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
+ if (true == knownManifestFilePaths.add(mfi.getManifestFilePath())) { // heretofore unknown
+ List<String> novelListedPaths = mfi.getListedFilePaths().stream()
+ .filter(fpath -> true == knownListedFilePaths.add(fpath)) // heretofore unknown
+ .collect(Collectors.toList());
+ if (novelListedPaths.size() == mfi.getListedFilePaths().size()) { // nothing filtered
+ novelManifestInfos.add(mfi); // reuse orig
+ } else {
+ novelManifestInfos.add(new ManifestFileInfo(mfi.getManifestFilePath(), novelListedPaths));
+ }
+ } // else, whenever recognized manifest file, skip w/ all its listed paths--which also all would be recognized
+ }
+ if (novelManifestInfos.size() == snapshotInfo.getManifestFiles().size()) { // nothing filtered
+ return snapshotInfo; // reuse orig
+ } else {
+ return snapshotInfo.toBuilder().manifestFiles(novelManifestInfos).build(); // replace manifestFiles
+ }
+ }), snapshotInfo -> snapshotInfo.getManifestListPath() != null); // remove marked-as-repeat-manifest-list snapshots
+ }
+
+ /** @throws {@link IcebergTable.TableNotFoundException} when table does not exist */
+ protected TableMetadata accessTableMetadata() throws TableNotFoundException {
+ TableMetadata current = this.tableOps.current();
+ return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId));
+ }
+
+ protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional<String> metadataFileLocation) throws IOException {
+ // TODO: verify correctness, even when handling 'delete manifests'!
List<ManifestFile> manifests = snapshot.allManifests();
return new IcebergSnapshotInfo(
snapshot.snapshotId(),
Instant.ofEpochMilli(snapshot.timestampMillis()),
- current.metadataFileLocation(),
+ metadataFileLocation,
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, tableOps.io()))` due to checked exception
- calcAllManifestFileInfo(manifests, tableOps.io())
+ calcAllManifestFileInfos(manifests, tableOps.io())
);
}
- @VisibleForTesting
- static List<ManifestFileInfo> calcAllManifestFileInfo(List<ManifestFile> manifests, FileIO io) throws IOException {
+ protected static List<IcebergSnapshotInfo.ManifestFileInfo> calcAllManifestFileInfos(List<ManifestFile> manifests, FileIO io) throws IOException {
List<ManifestFileInfo> result = Lists.newArrayList();
for (ManifestFile manifest : manifests) {
result.add(calcManifestFileInfo(manifest, io));
@@ -69,13 +150,11 @@ public class IcebergTable {
return result;
}
- @VisibleForTesting
- static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
+ protected static IcebergSnapshotInfo.ManifestFileInfo calcManifestFileInfo(ManifestFile manifest, FileIO io) throws IOException {
return new ManifestFileInfo(manifest.path(), discoverDataFilePaths(manifest, io));
}
- @VisibleForTesting
- static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
+ protected static List<String> discoverDataFilePaths(ManifestFile manifest, FileIO io) throws IOException {
CloseableIterable<String> manifestPathsIterable = ManifestFiles.readPaths(manifest, io);
try {
return Lists.newArrayList(manifestPathsIterable);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index f409ee67a..0f7fd491d 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -24,19 +24,26 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.stream.Collectors;
+
+import lombok.Data;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.api.client.util.Maps;
+import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -46,31 +53,57 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.PreserveAttributes;
-public class
-IcebergDatasetTest {
-
- static final String METADATA_PATH = "/root/iceberg/test/metadata";
- static final String MANIFEST_PATH = "/root/iceberg/test/metadata/test_manifest";
- static final String MANIFEST_LIST_PATH = "/root/iceberg/test/metadata/test_manifest/data";
- static final String MANIFEST_FILE_PATH1 = "/root/iceberg/test/metadata/test_manifest/data/a";
- static final String MANIFEST_FILE_PATH2 = "/root/iceberg/test/metadata/test_manifest/data/b";
+public class IcebergDatasetTest {
+
+ private static final String ROOT_PATH = "/root/iceberg/test/";
+ private static final String METADATA_PATH = ROOT_PATH + "metadata/metadata.json";
+ private static final String MANIFEST_LIST_PATH_0 = ROOT_PATH + "metadata/manifest_list.x";
+ private static final String MANIFEST_PATH_0 = ROOT_PATH + "metadata/manifest.a";
+ private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
+ private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+ private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 = new MockedIcebergTable.SnapshotPaths(
+ Optional.of(METADATA_PATH), MANIFEST_LIST_PATH_0,
+ Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
+ MANIFEST_PATH_0, Arrays.asList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B)))
+ );
+ private static final String MANIFEST_LIST_PATH_1 = MANIFEST_LIST_PATH_0.replaceAll("\\.x$", ".y");
+ private static final String MANIFEST_PATH_1 = MANIFEST_PATH_0.replaceAll("\\.a$", ".b");
+ private static final String MANIFEST_DATA_PATH_1A = MANIFEST_DATA_PATH_0A.replaceAll("/p0/", "/p1/");
+ private static final String MANIFEST_DATA_PATH_1B = MANIFEST_DATA_PATH_0B.replaceAll("/p0/", "/p1/");
+ private static final MockedIcebergTable.SnapshotPaths SNAPSHOT_PATHS_1 = new MockedIcebergTable.SnapshotPaths(
+ Optional.empty(), MANIFEST_LIST_PATH_1,
+ Arrays.asList(new IcebergSnapshotInfo.ManifestFileInfo(
+ MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))
+ );
+
+ private final String test_db_name = "test_db_name";
+ private final String test_table_name = "test_tbl_name";
+ private final String test_qualified_path = "/root/iceberg/test/destination/sub_path_destination";
+ private final String test_uri_path = "/root/iceberg/test/output";
+ private final Properties properties = new Properties();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ properties.setProperty("data.publisher.final.dir", "/test");
+ }
@Test
public void testGetFilePaths() throws IOException {
- List<String> pathsToCopy = new ArrayList<>();
- pathsToCopy.add(MANIFEST_FILE_PATH1);
- pathsToCopy.add(MANIFEST_FILE_PATH2);
+ List<String> pathsToCopy = Lists.newArrayList(MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
Map<Path, FileStatus> expected = Maps.newHashMap();
- expected.put(new Path(MANIFEST_FILE_PATH1), null);
- expected.put(new Path(MANIFEST_FILE_PATH2), null);
+ expected.put(new Path(MANIFEST_DATA_PATH_0A), null);
+ expected.put(new Path(MANIFEST_DATA_PATH_0B), null);
IcebergTable icebergTable = Mockito.mock(IcebergTable.class);
FileSystem fs = Mockito.mock(FileSystem.class);
IcebergSnapshotInfo icebergSnapshotInfo = Mockito.mock(IcebergSnapshotInfo.class);
- Mockito.when(icebergTable.getCurrentSnapshotInfo()).thenReturn(icebergSnapshotInfo);
+ Mockito.when(icebergTable.getIncrementalSnapshotInfosIterator()).thenReturn(Arrays.asList(icebergSnapshotInfo).iterator());
Mockito.when(icebergSnapshotInfo.getAllPaths()).thenReturn(pathsToCopy);
+ Mockito.when(icebergSnapshotInfo.getSnapshotId()).thenReturn(98765L);
+ Mockito.when(icebergSnapshotInfo.getMetadataPath()).thenReturn(Optional.of("path for log message"));
+
IcebergDataset icebergDataset = new IcebergDataset("test_db_name", "test_tbl_name", icebergTable, new Properties(), fs);
Map<Path, FileStatus> actual = icebergDataset.getFilePathsToFileStatus();
@@ -84,52 +117,61 @@ IcebergDatasetTest {
*/
@Test
public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException, URISyntaxException {
+ List<String> expectedPaths = Arrays.asList(METADATA_PATH, MANIFEST_LIST_PATH_0,
+ MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B);
FileSystem fs = Mockito.mock(FileSystem.class);
- String test_db_name = "test_db_name";
- String test_table_name = "test_tbl_name";
- String test_qualified_path = "/root/iceberg/test/destination/sub_path_destination";
- String test_uri_path = "/root/iceberg/test/output";
- Properties properties = new Properties();
- properties.setProperty("data.publisher.final.dir", "/test");
- List<String> expected = new ArrayList<>(Arrays.asList(METADATA_PATH, MANIFEST_PATH, MANIFEST_LIST_PATH, MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2));
+ IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergDataset icebergDataset = new IcebergDataset(test_db_name, test_table_name, icebergTable, new Properties(), fs);
+ DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
+ destinationFileSystem.addPaths(expectedPaths);
+
+ mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus, test_qualified_path, test_uri_path);
CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, properties)
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext())
.build();
+ Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(fs, copyConfiguration);
+ verifyCopyEntities(copyEntities, expectedPaths);
+ }
+
+ /** Test generating copy entities for a multi-snapshot iceberg; given empty dest, src-dest delta will be entirety */
+ @Test
+ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOException, URISyntaxException {
+ List<String> expectedPaths = Arrays.asList(METADATA_PATH,
+ MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, MANIFEST_DATA_PATH_0B,
+ MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B);
- List<String> listedManifestFilePaths = Arrays.asList(MANIFEST_FILE_PATH1, MANIFEST_FILE_PATH2);
- IcebergSnapshotInfo.ManifestFileInfo manifestFileInfo = new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_LIST_PATH, listedManifestFilePaths);
- List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles = Arrays.asList(manifestFileInfo);
- IcebergTable icebergTable = new MockedIcebergTable(METADATA_PATH, MANIFEST_PATH, manifestFiles);
+ FileSystem fs = Mockito.mock(FileSystem.class);
+ IcebergTable icebergTable = new MockedIcebergTable(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0));
IcebergDataset icebergDataset = new IcebergDataset(test_db_name, test_table_name, icebergTable, new Properties(), fs);
DestinationFileSystem destinationFileSystem = new DestinationFileSystem();
- destinationFileSystem.addPath(METADATA_PATH);
- destinationFileSystem.addPath(MANIFEST_PATH);
- destinationFileSystem.addPath(MANIFEST_LIST_PATH);
- destinationFileSystem.addPath(MANIFEST_FILE_PATH1);
- destinationFileSystem.addPath(MANIFEST_FILE_PATH2);
+ destinationFileSystem.addPaths(expectedPaths);
mockFileSystemMethodCalls(fs, destinationFileSystem.pathToFileStatus, test_qualified_path, test_uri_path);
+ CopyConfiguration copyConfiguration = CopyConfiguration.builder(null, properties)
+ .preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext())
+ .build();
Collection<CopyEntity> copyEntities = icebergDataset.generateCopyEntities(fs, copyConfiguration);
- verifyCopyEntities(copyEntities, expected);
-
+ verifyCopyEntities(copyEntities, expectedPaths);
}
private void verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected) {
List<String> actual = new ArrayList<>();
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
- JsonObject jsonObject = new Gson().fromJson(json, JsonObject.class);
- JsonObject objectData =
- jsonObject.getAsJsonObject("object-data").getAsJsonObject("origin").getAsJsonObject("object-data");
- JsonObject pathObject = objectData.getAsJsonObject("path").getAsJsonObject("object-data").getAsJsonObject("uri");
- String filepath = pathObject.getAsJsonPrimitive("object-data").getAsString();
+ String filepath = new Gson().fromJson(json, JsonObject.class)
+ .getAsJsonObject("object-data").getAsJsonObject("origin")
+ .getAsJsonObject("object-data").getAsJsonObject("path")
+ .getAsJsonObject("object-data").getAsJsonObject("uri")
+ .getAsJsonPrimitive("object-data").getAsString();
actual.add(filepath);
}
- Assert.assertEquals(actual.size(), expected.size());
+ Assert.assertEquals(actual.size(), expected.size(),
+ "Set" + actual.toString() + " vs Set" + expected.toString());
Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
}
@@ -147,22 +189,32 @@ IcebergDatasetTest {
private static class MockedIcebergTable extends IcebergTable {
- String metadataPath;
- String manifestListPath;
- List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+ @Data
+ public static class SnapshotPaths {
+ private final Optional<String> metadataPath;
+ private final String manifestListPath;
+ private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+ }
+
+ private final List<SnapshotPaths> snapshotPathsList;
- public MockedIcebergTable(String metadataPath, String manifestListPath, List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles) {
- super(null);
- this.metadataPath = metadataPath;
- this.manifestListPath = manifestListPath;
- this.manifestFiles = manifestFiles;
+ public MockedIcebergTable(List<SnapshotPaths> snapshotPathsList) {
+ super(null, null);
+ this.snapshotPathsList = Lists.newCopyOnWriteArrayList(snapshotPathsList);
}
@Override
- public IcebergSnapshotInfo getCurrentSnapshotInfo() {
+ public Iterator<IcebergSnapshotInfo> getAllSnapshotInfosIterator() {
Long snapshotId = 0L;
Instant timestamp = Instant.ofEpochMilli(0L);
- return new IcebergSnapshotInfo(snapshotId, timestamp, metadataPath, manifestListPath, manifestFiles);
+ List<IcebergSnapshotInfo> snapshotInfos = snapshotPathsList.stream()
+ .map(snapshotPaths -> createSnapshotInfo(snapshotPaths, snapshotId, timestamp))
+ .collect(Collectors.toList());
+ return snapshotInfos.iterator();
+ }
+
+ private IcebergSnapshotInfo createSnapshotInfo(SnapshotPaths snapshotPaths, Long snapshotId, Instant timestamp) {
+ return new IcebergSnapshotInfo(snapshotId, timestamp, snapshotPaths.metadataPath, snapshotPaths.manifestListPath, snapshotPaths.manifestFiles);
}
}
@@ -173,10 +225,13 @@ IcebergDatasetTest {
this.pathToFileStatus = Maps.newHashMap();
}
- public void addPath(String pathString) {
- if (StringUtils.isBlank(pathString)) {
- throw new IllegalArgumentException("Missing path value for the file system");
+ public void addPaths(List<String> pathStrings) {
+ for (String pathString : pathStrings) {
+ addPath(pathString);
}
+ }
+
+ public void addPath(String pathString) {
Path path = new Path(pathString);
FileStatus fileStatus = new FileStatus();
fileStatus.setPath(path);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 9cf62ca98..fd0b3cf35 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -100,12 +101,93 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- IcebergSnapshotInfo snapshotInfo = new IcebergTable(catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+ verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
+ }
+
+ /** Verify failure when attempting to get current snapshot info for non-existent table */
+ @Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
+ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
+ TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS");
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId)).getCurrentSnapshotInfo();
+ Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
+ }
+
+ /** Verify info about all (full) snapshots */
+ @Test
+ public void testGetAllSnapshotInfosIterator() throws IOException {
+ List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+ Lists.newArrayList("/path/to/data-a0.orc"),
+ Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
+ Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+ Lists.newArrayList("/path/to/data-d0.orc")
+ );
+ initializeSnapshots(table, perSnapshotFilesets);
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getAllSnapshotInfosIterator());
+ Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+ for (int i = 0; i < snapshotInfos.size(); ++i) {
+ System.err.println("verifying snapshotInfo[" + i + "]");
+ verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(0, i + 1), snapshotInfos.size());
+ }
+ }
+
+ /** Verify info about all snapshots (incremental deltas) */
+ @Test
+ public void testGetIncrementalSnapshotInfosIterator() throws IOException {
+ List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+ Lists.newArrayList("/path/to/data-a0.orc"),
+ Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc"),
+ Lists.newArrayList("/path/to/data-c0.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+ Lists.newArrayList("/path/to/data-d0.orc")
+ );
+
+ initializeSnapshots(table, perSnapshotFilesets);
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+ Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+ for (int i = 0; i < snapshotInfos.size(); ++i) {
+ System.err.println("verifying snapshotInfo[" + i + "]");
+ verifySnapshotInfo(snapshotInfos.get(i), perSnapshotFilesets.subList(i, i + 1), snapshotInfos.size());
+ }
+ }
+
+ /** Verify info about all snapshots (incremental deltas) correctly eliminates repeated data files */
+ @Test
+ public void testGetIncrementalSnapshotInfosIteratorRepeatedFiles() throws IOException {
+ List<List<String>> perSnapshotFilesets = Lists.newArrayList(
+ Lists.newArrayList("/path/to/data-a0.orc"),
+ Lists.newArrayList("/path/to/data-b0.orc", "/path/to/data-b1.orc", "/path/to/data-a0.orc"),
+ Lists.newArrayList("/path/to/data-a0.orc","/path/to/data-c0.orc", "/path/to/data-b1.orc", "/path/to/data-c1.orc", "/path/to/data-c2.orc"),
+ Lists.newArrayList("/path/to/data-d0.orc")
+ );
+
+ initializeSnapshots(table, perSnapshotFilesets);
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+ Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
+
+ for (int i = 0; i < snapshotInfos.size(); ++i) {
+ System.err.println("verifying snapshotInfo[" + i + "] - " + snapshotInfos.get(i));
+ char initialChar = (char) ((int) 'a' + i);
+ // adjust expectations to eliminate duplicate entries (i.e. those bearing letter not aligned with ordinal fileset)
+ List<String> fileset = perSnapshotFilesets.get(i).stream().filter(name -> {
+ String uniquePortion = name.substring("/path/to/data-".length());
+ return uniquePortion.startsWith(Character.toString(initialChar));
+ }).collect(Collectors.toList());
+ verifySnapshotInfo(snapshotInfos.get(i), Arrays.asList(fileset), snapshotInfos.size());
+ }
+ }
+
+ /** full validation for a particular {@link IcebergSnapshotInfo} */
+ protected void verifySnapshotInfo(IcebergSnapshotInfo snapshotInfo, List<List<String>> perSnapshotFilesets, int overallNumSnapshots) {
// verify metadata file
- Optional<File> optMetadataFile = extractSomeMetadataFilepath(snapshotInfo.getMetadataPath(), metadataBasePath, IcebergTableTest::doesResembleMetadataFilename);
- Assert.assertTrue(optMetadataFile.isPresent(), "has metadata filepath");
- verifyMetadataFile(optMetadataFile.get(), Optional.of(perSnapshotFilesets.size()));
+ snapshotInfo.getMetadataPath().ifPresent(metadataPath -> {
+ Optional<File> optMetadataFile = extractSomeMetadataFilepath(metadataPath, metadataBasePath, IcebergTableTest::doesResembleMetadataFilename);
+ Assert.assertTrue(optMetadataFile.isPresent(), "has metadata filepath");
+ verifyMetadataFile(optMetadataFile.get(), Optional.of(overallNumSnapshots));
+ }
+ );
// verify manifest list file
Optional<File> optManifestListFile = extractSomeMetadataFilepath(snapshotInfo.getManifestListPath(), metadataBasePath, IcebergTableTest::doesResembleManifestListFilename);
Assert.assertTrue(optManifestListFile.isPresent(), "has manifest list filepath");
@@ -115,7 +197,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
verifyManifestFiles(manifestFileInfos, snapshotInfo.getManifestFilePaths(), perSnapshotFilesets);
verifyAnyOrder(snapshotInfo.getAllDataFilePaths(), flatten(perSnapshotFilesets), "data filepaths");
// verify all aforementioned paths collectively equal `getAllPaths()`
- List<String> allPathsExpected = Lists.newArrayList(snapshotInfo.getMetadataPath(), snapshotInfo.getManifestListPath());
+ List<String> allPathsExpected = Lists.newArrayList(snapshotInfo.getManifestListPath());
+ snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
allPathsExpected.addAll(snapshotInfo.getManifestFilePaths());
allPathsExpected.addAll(snapshotInfo.getAllDataFilePaths());
verifyAnyOrder(snapshotInfo.getAllPaths(), allPathsExpected, "all paths, metadata and data");
@@ -131,7 +214,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
return basePath;
}
- /** Add one snapshot per sub-list of `perSnapshotFilesets`, in order, with the sub-list contents as its data files */
+ /** Add one snapshot per sub-list of `perSnapshotFilesets`, in order, with the sub-list contents as the data files */
protected static void initializeSnapshots(Table table, List<List<String>> perSnapshotFilesets) {
for (List<String> snapshotFileset : perSnapshotFilesets) {
AppendFiles append = table.newAppend();