You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/03/02 20:46:19 UTC

[gobblin] branch master updated: [GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9146e5e22 [GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)
9146e5e22 is described below

commit 9146e5e221ce71adcce3180f7cb89a58ce9f8a01
Author: meethngala <me...@gmail.com>
AuthorDate: Thu Mar 2 12:46:12 2023 -0800

    [GOBBLIN-1786] Support Other Catalog Types for Iceberg Distcp (#3643)
    
    * refactor iceberg distcp to support other catalog types
    
    * refactor to incorporate generic properties for identifying iceberg source catalog
    
    * update Catalog specifier to take catalog name
    
    * encapsulating src catalog uri information and update unit tests
    
    * refactoring classes to support multiple catalog types with a catalog specifier
    
    * rename var and remove unnecessary import
    
    * refactor dataset descriptor to be iceberg table specific
    
    * remove unused imports
    
    * associate catalog uri with IcebergTable
    
    * update default value for catalog uri
    
    ---------
    
    Co-authored-by: Meeth Gala <mg...@linkedin.com>
---
 .../copy/iceberg/BaseIcebergCatalog.java           | 52 ++++++++++++++++++++++
 .../management/copy/iceberg/IcebergCatalog.java    |  5 +++
 .../copy/iceberg/IcebergCatalogFactory.java        | 20 +++++----
 .../management/copy/iceberg/IcebergDataset.java    | 42 +++++------------
 .../copy/iceberg/IcebergDatasetFinder.java         | 48 ++++++++++++++------
 .../copy/iceberg/IcebergHiveCatalog.java           | 34 ++++++++++----
 .../data/management/copy/iceberg/IcebergTable.java | 15 +++++++
 .../copy/iceberg/IcebergDatasetTest.java           |  1 +
 .../management/copy/iceberg/IcebergTableTest.java  | 13 +++---
 9 files changed, 162 insertions(+), 68 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
new file mode 100644
index 000000000..f3ef3309a
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/**
+ * Base implementation of {@link IcebergCatalog} to access {@link IcebergTable} and the
+ * underlying concrete companion catalog e.g. {@link org.apache.iceberg.hive.HiveCatalog}
+ */
+public abstract class BaseIcebergCatalog implements IcebergCatalog {
+
+  protected final String catalogName;
+  protected final Class<? extends Catalog> companionCatalogClass;
+
+  protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> companionCatalogClass) {
+    this.catalogName = catalogName;
+    this.companionCatalogClass = companionCatalogClass;
+  }
+
+  @Override
+  public IcebergTable openTable(String dbName, String tableName) {
+    TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
+    return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri());
+  }
+
+  protected Catalog createCompanionCatalog(Map<String, String> properties, Configuration configuration) {
+    return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration);
+  }
+
+  protected abstract TableOperations createTableOperations(TableIdentifier tableId);
+}
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 70423e6a8..15211c8b5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -17,10 +17,15 @@
 
 package org.apache.gobblin.data.management.copy.iceberg;
 
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+
 
 /**
  * Any catalog from which to access {@link IcebergTable}s.
  */
 public interface IcebergCatalog {
   IcebergTable openTable(String dbName, String tableName);
+  String getCatalogUri();
+  void initialize(Map<String, String> properties, Configuration configuration);
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
index a3e8464c3..e26140214 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalogFactory.java
@@ -17,20 +17,24 @@
 
 package org.apache.gobblin.data.management.copy.iceberg;
 
+import java.io.IOException;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.hive.HiveCatalog;
-
-import com.google.common.collect.Maps;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
  * Provides an {@link IcebergCatalog}.
  */
 public class IcebergCatalogFactory {
-  public static IcebergCatalog create(Configuration configuration) {
-    HiveCatalog hcat = new HiveCatalog();
-    hcat.setConf(configuration);
-    hcat.initialize("hive", Maps.newHashMap());
-    return new IcebergHiveCatalog(hcat);
+  public static IcebergCatalog create(String icebergCatalogClassName, Map<String, String> properties, Configuration configuration) throws IOException {
+    try {
+      Class<?> icebergCatalogClass = Class.forName(icebergCatalogClassName);
+      IcebergCatalog icebergCatalog = (IcebergCatalog) GobblinConstructorUtils.invokeConstructor(icebergCatalogClass, icebergCatalogClassName);
+      icebergCatalog.initialize(properties, configuration);
+      return icebergCatalog;
+    } catch (ReflectiveOperationException ex) {
+      throw new IOException(ex);
+    }
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 63b23260f..2119daef5 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -28,14 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-
 import java.util.function.Function;
-import javax.annotation.concurrent.NotThreadSafe;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.util.function.CheckedExceptionFunction;
-import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -47,6 +39,10 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
@@ -54,9 +50,10 @@ import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.OwnerAndPermission;
 import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
 import org.apache.gobblin.data.management.partition.FileSet;
-import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
 import org.apache.gobblin.util.request_allocation.PushDownRequestor;
 
 /**
@@ -72,12 +69,9 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired
 
-  private final Optional<URI> sourceCatalogMetastoreURI;
-  private final Optional<URI> targetCatalogMetastoreURI;
-
   /** Target metastore URI */
-  public static final String TARGET_METASTORE_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.metastore.uri";
+  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
+      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
   /** Target database name */
   public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
 
@@ -87,8 +81,6 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
     this.icebergTable = icebergTbl;
     this.properties = properties;
     this.sourceFs = sourceFs;
-    this.sourceCatalogMetastoreURI = getAsOptionalURI(this.properties, IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY);
-    this.targetCatalogMetastoreURI = getAsOptionalURI(this.properties, TARGET_METASTORE_URI_KEY);
   }
 
   @Override
@@ -314,25 +306,11 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
     return fileStatus.getPath().getFileSystem(hadoopConfig);
   }
 
-  protected static Optional<URI> getAsOptionalURI(Properties props, String key) {
-    return Optional.ofNullable(props.getProperty(key)).map(URI::create);
-  }
-
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return getDatasetDescriptor(sourceCatalogMetastoreURI, sourceFs);
+    return this.icebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return getDatasetDescriptor(targetCatalogMetastoreURI, targetFs);
-  }
-
-  private DatasetDescriptor getDatasetDescriptor(Optional<URI> catalogMetastoreURI, FileSystem fs) {
-    DatasetDescriptor descriptor = new DatasetDescriptor(
-        DatasetConstants.PLATFORM_ICEBERG,
-        catalogMetastoreURI.orElse(null),
-        this.getFileSetId()
-    );
-    descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
-    return descriptor;
+    return this.icebergTable.getDatasetDescriptor(targetFs);
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index 4eb77980b..bc111dc25 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -19,18 +19,20 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.util.HadoopUtils;
@@ -44,9 +46,13 @@ import org.apache.gobblin.util.HadoopUtils;
 public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {
 
   public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
-  public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
+  public static final String ICEBERG_CLUSTER_KEY = "cluster";
   public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
   public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
+  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
+  public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
+  public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+  public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
 
   protected final FileSystem sourceFs;
   private final Properties properties;
@@ -68,16 +74,18 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
     String dbName = properties.getProperty(ICEBERG_DB_NAME);
     String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
 
-    Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-
-    IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(configuration);
-    /* Each Iceberg dataset maps to an Iceberg table
-     * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
-     */
-    matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
-    log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);
-
-    return matchingDatasets;
+    try {
+      IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
+      /* Each Iceberg dataset maps to an Iceberg table
+       * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
+       */
+      matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
+      log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
+          matchingDatasets, dbName, tblName);
+      return matchingDatasets;
+    } catch (ReflectiveOperationException exception) {
+      throw new IOException(exception);
+    }
   }
 
   @Override
@@ -94,4 +102,16 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
     IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
     return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
   }
+
+  protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
+    Map<String, String> catalogProperties = new HashMap<>();
+    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
+    catalogProperties.put(CatalogProperties.URI, catalogUri);
+    // introducing an optional property for catalogs requiring cluster specific properties
+    Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+    Configuration configuration = HadoopUtils.getConfFromProperties(properties);
+    String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+  }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index 0af012ec5..5525750e6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -17,25 +17,41 @@
 
 package org.apache.gobblin.data.management.copy.iceberg;
 
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveCatalog;
+import lombok.extern.slf4j.Slf4j;
 
 
 /**
  * Hive-Metastore-based {@link IcebergCatalog}.
  */
 @Slf4j
-@AllArgsConstructor
-public class IcebergHiveCatalog implements IcebergCatalog {
+
+public class IcebergHiveCatalog extends BaseIcebergCatalog {
+  public static final String HIVE_CATALOG_NAME = "HiveCatalog";
   // NOTE: specifically necessitates `HiveCatalog`, as `BaseMetastoreCatalog.newTableOps` is `protected`!
-  private final HiveCatalog hc;
+  private HiveCatalog hc;
+
+  public IcebergHiveCatalog() {
+    super(HIVE_CATALOG_NAME, HiveCatalog.class);
+  }
+
+  @Override
+  public void initialize(Map<String, String> properties, Configuration configuration) {
+    hc = (HiveCatalog) createCompanionCatalog(properties, configuration);
+  }
+
+  @Override
+  public String getCatalogUri() {
+    return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
+  }
 
   @Override
-  public IcebergTable openTable(String dbName, String tableName) {
-    TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
-    return new IcebergTable(tableId, hc.newTableOps(tableId));
+  protected TableOperations createTableOperations(TableIdentifier tableId) {
+    return hc.newTableOps(tableId);
   }
 }
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index fbd924845..f65a81417 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
+import java.net.URI;
 import java.time.Instant;
 import java.util.Iterator;
 import java.util.List;
@@ -29,6 +30,7 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.Snapshot;
@@ -42,6 +44,9 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
 import static org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
 
 
@@ -65,6 +70,7 @@ public class IcebergTable {
 
   private final TableIdentifier tableId;
   private final TableOperations tableOps;
+  private final String catalogUri;
 
   /** @return metadata info limited to the most recent (current) snapshot */
   public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
@@ -179,4 +185,13 @@ public class IcebergTable {
       manifestPathsIterable.close();
     }
   }
+  protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
+    DatasetDescriptor descriptor = new DatasetDescriptor(
+        DatasetConstants.PLATFORM_ICEBERG,
+        URI.create(this.catalogUri),
+        this.tableId.name()
+    );
+    descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
+    return descriptor;
+  }
 }
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 40cfb7bc3..9478207f8 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -101,6 +101,7 @@ public class IcebergDatasetTest {
 
   private final String testDbName = "test_db_name";
   private final String testTblName = "test_tbl_name";
+  public static final String SRC_CATALOG_URI = "abc://the.source.org/catalog";
   private final Properties copyConfigProperties = new Properties();
 
   @BeforeClass
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index fd0b3cf35..3353c3365 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -29,6 +29,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
@@ -70,6 +71,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
   private final String tableName = "justtesting";
   private TableIdentifier tableId;
   private Table table;
+  private String catalogUri;
   private String metadataBasePath;
 
   @BeforeClass
@@ -82,6 +84,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
   public void setUpEachTest() {
     tableId = TableIdentifier.of(dbName, tableName);
     table = catalog.createTable(tableId, icebergSchema);
+    catalogUri = catalog.getConf().get(CatalogProperties.URI);
     metadataBasePath = calcMetadataBasePath(tableId);
   }
 
@@ -101,7 +104,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId)).getCurrentSnapshotInfo();
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo();
     verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, perSnapshotFilesets.size());
   }
 
@@ -109,7 +112,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
   @Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
   public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
     TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + "_BOGUS");
-    IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId)).getCurrentSnapshotInfo();
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo();
     Assert.fail("expected an exception when using table ID '" + bogusTableId + "'");
   }
 
@@ -124,7 +127,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getAllSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getAllSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -144,7 +147,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -164,7 +167,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId)).getIncrementalSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri).getIncrementalSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {