You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/02 20:46:19 UTC
[gobblin] branch master updated: [GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)
This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9146e5e22 [GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)
9146e5e22 is described below
commit 9146e5e221ce71adcce3180f7cb89a58ce9f8a01
Author: meethngala <me...@gmail.com>
AuthorDate: Thu Mar 2 12:46:12 2023 -0800
[GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)
* refactor iceberg distcp to support other catalog types
* refactor to incorporate generic properties for identifying iceberg source catalog
* update Catalog specifier to take catalog name
* encapsulating src catalog uri information and update unit tests
* refactoring classes to support multiple catalog types with a catalog specifier
* rename var and remove unnecessary import
* refactor dataset descriptor to be iceberg table specific
* remove unused imports
* associate catalog uri with IcebergTable
* update default value for catalog uri
---------
Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
.../copy/iceberg/BaseIcebergCatalog.java | 52 ++++++++++++++++++++++
.../management/copy/iceberg/IcebergCatalog.java | 5 +++
.../copy/iceberg/IcebergCatalogFactory.java | 20 +++++----
.../management/copy/iceberg/IcebergDataset.java | 42 +++++------------
.../copy/iceberg/IcebergDatasetFinder.java | 48 ++++++++++++++------
.../copy/iceberg/IcebergHiveCatalog.java | 34 ++++++++++----
.../data/management/copy/iceberg/IcebergTable.java | 15 +++++++
.../copy/iceberg/IcebergDatasetTest.java | 1 +
.../management/copy/iceberg/IcebergTableTest.java | 13 +++---
9 files changed, 162 insertions(+), 68 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
new file mode 100644
index 000000000..f3ef3309a
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the
+ * underlying concrete companion catalog e.g. {@link org.apache.iceberg.hive.HiveCatalog}
+ */
+public abstract class BaseIcebergCatalog implements IcebergCatalog {
+
+ protected final String catalogName;
+ protected final Class<? extends Catalog> companionCatalogClass;
+
+ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> companionCatalogClass) {
+ this.catalogName = catalogName;
+ this.companionCatalogClass = companionCatalogClass;
+ }
+
+ @Override
+ public IcebergTable openTable(String dbName, String tableName) {
+ TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
+ return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri());
+ }
+
+ protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) {
+ return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration);
+ }
+
+ protected abstract TableOperations createTableOperations(TableIdentifier tableId);
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 70423e6a8..15211c8b5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -17,10 +17,15 @@
package org.apache.gobblin.data.management.copy.iceberg;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
+ String getCatalogUri();
+ void initialize(Map<String, String> properties, Configuration configuration);
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
index a3e8464c3..e26140214 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -17,20 +17,24 @@
package org.apache.gobblin.data.management.copy.iceberg;
+import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.hive.HiveCatalog;
-
-import com.google.common.collect.Maps;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
* Provides an {@link IcebergCatalog}.
*/
public class IcebergCatalogFactory {
- public static IcebergCatalog create(Configuration configuration) {
- HiveCatalog hcat = new HiveCatalog();
- hcat.setConf(configuration);
- hcat.initialize("hive", Maps.newHashMap());
- return new IcebergHiveCatalog(hcat);
+ public static IcebergCatalog create(String icebergCatalogClassName, Map<String, String> properties, Configuration configuration) throws IOException {
+ try {
+ Class<?> icebergCatalogClass = Class.forName(icebergCatalogClassName);
+ IcebergCatalog icebergCatalog = (IcebergCatalog) GobblinConstructorUtils.invokeConstructor(icebergCatalogClass, icebergCatalogClassName);
+ icebergCatalog.initialize(properties, configuration);
+ return icebergCatalog;
+ } catch (ReflectiveOperationException ex) {
+ throw new IOException(ex);
+ }
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 63b23260f..2119daef5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -28,14 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
-
import java.util.function.Function;
-import javax.annotation.concurrent.NotThreadSafe;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.util.function.CheckedExceptionFunction;
-import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +39,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
@@ -54,9 +50,10 @@ import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;
/**
@@ -72,12 +69,9 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired
- private final Optional<URI> sourceCatalogMetastoreURI;
- private final Optional<URI> targetCatalogMetastoreURI;
-
/** Target metastore URI */
- public static final String TARGET_METASTORE_URI_KEY =
- IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.metastore.uri";
+ public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
+ IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
@@ -87,8 +81,6 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
this.icebergTable = icebergTbl;
this.properties = properties;
this.sourceFs = sourceFs;
- this.sourceCatalogMetastoreURI = getAsOptionalURI(this.properties, IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY);
- this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties, TARGET_METASTORE_URI_KEY);
}
@Override
@@ -314,25 +306,11 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
return fileStatus.getPath().getFileSystem(hadoopConfig);
}
- protected static Optional<URI> getAsOptionalURI(Properties props, String key) {
- return Optional.ofNullable(props.getProperty(key)).map(URI::create);
- }
-
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return getDatasetDescriptor(sourceCatalogMetastoreURI, sourceFs);
+ return this.icebergTable.getDatasetDescriptor(sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return getDatasetDescriptor(targetCatalogMetastoreURI, targetFs);
- }
-
- private DatasetDescriptor getDatasetDescriptor(Optional<URI> catalogMetastoreURI, FileSystem fs) {
- DatasetDescriptor descriptor = new DatasetDescriptor(
- DatasetConstants.PLATFORM_ICEBERG,
- catalogMetastoreURI.orElse(null),
- this.getFileSetId()
- );
- descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
- return descriptor;
+ return this.icebergTable.getDatasetDescriptor(targetFs);
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index 4eb77980b..bc111dc25 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
-
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;
@@ -44,9 +46,13 @@ import org.apache.gobblin.util.HadoopUtils;
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {
public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
- public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+ public static final String ICEBERG_CLUSTER_KEY = "cluster";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
+ public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
+ public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
+ public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+ public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
protected final FileSystem sourceFs;
private final Properties properties;
@@ -68,16 +74,18 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
- Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-
- IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(configuration);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
- return matchingDatasets;
+ try {
+ IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
+ /* Each Iceberg dataset maps to an Iceberg table
+ * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
+ */
+ matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
+ log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
+ matchingDatasets, dbName, tblName);
+ return matchingDatasets;
+ } catch (ReflectiveOperationException exception) {
+ throw new IOException(exception);
+ }
}
@Override
@@ -94,4 +102,16 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
}
+
+ protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
+ Map<String, String> catalogProperties = new HashMap<>();
+ String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
+ catalogProperties.put(CatalogProperties.URI, catalogUri);
+ // introducing an optional property for catalogs requiring cluster specific properties
+ Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ Configuration configuration = HadoopUtils.getConfFromProperties(properties);
+ String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+ return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+ }
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index 0af012ec5..5525750e6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -17,25 +17,41 @@
package org.apache.gobblin.data.management.copy.iceberg;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
+import lombok.extern.slf4j.Slf4j;
/**
* Hive-Metastore-based {@link IcebergCatalog}.
*/
@Slf4j
-@AllArgsConstructor
-public class IcebergHiveCatalog implements IcebergCatalog {
+
+public class IcebergHiveCatalog extends BaseIcebergCatalog {
+ public static final String HIVE_CATALOG_NAME = "HiveCatalog";
// NOTE: specifically necessitates `HiveCatalog`, as `BaseMetastoreCatalog.newTableOps` is `protected`!
- private final HiveCatalog hc;
+ private HiveCatalog hc;
+
+ public IcebergHiveCatalog() {
+ super(HIVE_CATALOG_NAME, HiveCatalog.class);
+ }
+
+ @Override
+ public void initialize(Map<String, String> properties, Configuration configuration) {
+ hc = (HiveCatalog) createCompanionCatalog(properties, configuration);
+ }
+
+ @Override
+ public String getCatalogUri() {
+ return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
+ }
@Override
- public IcebergTable openTable(String dbName, String tableName) {
- TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
- return new IcebergTable(tableId, hc.newTableOps(tableId));
+ protected TableOperations createTableOperations(TableIdentifier tableId) {
+ return hc.newTableOps(tableId);
}
}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fbd924845..f65a81417 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
+import java.net.URI;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
@@ -29,6 +30,7 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Snapshot;
@@ -42,6 +44,9 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
@@ -65,6 +70,7 @@ public class IcebergTable {
private final TableIdentifier tableId;
private final TableOperations tableOps;
+ private final String catalogUri;
/** @return metadata info limited to the most recent (current) snapshot */
public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
@@ -179,4 +185,13 @@ public class IcebergTable {
manifestPathsIterable.close();
}
}
+ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
+ DatasetDescriptor descriptor = new DatasetDescriptor(
+ DatasetConstants.PLATFORM_ICEBERG,
+ URI.create(this.catalogUri),
+ this.tableId.name()
+ );
+ descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+ return descriptor;
+ }
}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 40cfb7bc3..9478207f8 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -101,6 +101,7 @@ public class IcebergDatasetTest {
private final String testDbName = "test_db_name";
private final String testTblName = "test_tbl_name";
+ public static final String SRC_CATALOG_URI = "abc://the.source.org/catalog";
private final Properties copyConfigProperties = new Properties();
@BeforeClass
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index fd0b3cf35..3353c3365 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -29,6 +29,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
@@ -70,6 +71,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
private final String tableName = "justtesting";
private TableIdentifier tableId;
private Table table;
+ private String catalogUri;
private String metadataBasePath;
@BeforeClass
@@ -82,6 +84,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
public void setUpEachTest() {
tableId = TableIdentifier.of(dbName, tableName);
table = catalog.createTable(tableId, icebergSchema);
+ catalogUri = catalog.getConf().get(CatalogProperties.URI);
metadataBasePath = calcMetadataBasePath(tableId);
}
@@ -101,7 +104,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo();
verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
}
@@ -109,7 +112,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
@Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS");
- IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId)).getCurrentSnapshotInfo();
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo();
Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
}
@@ -124,7 +127,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getAllSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getAllSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -144,7 +147,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -164,7 +167,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {