You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "meethngala (via GitHub)" <gi...@apache.org> on 2023/03/21 23:52:56 UTC

[GitHub] [gobblin] meethngala commented on a diff in pull request #3663: [GOBBLIN-1802]Register iceberg table metadata update with destination side catalog

meethngala commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1144097069


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired
 
-  /** Target metastore URI */
-  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
   /** Target database name */
   public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
 
-  public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingTargetIcebergTable, Properties properties, FileSystem sourceFs) {
     this.dbName = db;
     this.inputTableName = table;
-    this.icebergTable = icebergTbl;
+    this.srcIcebergTable = srcIcebergTable;
+    this.existingTargetIcebergTable = existingTargetIcebergTable;

Review Comment:
   I have added the Javadoc conveying that we presume/require that the destination table exists. I have taken your suggestion to check if it even exists before we initiate any replication. In my latest commit, I am checking for the table before creating a new `TableOps` for it



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);

Review Comment:
   yes, we do. It was just a part of refactor -> rename wherein it picked up src instead of dst



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -64,21 +65,20 @@
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable existingTargetIcebergTable;

Review Comment:
   I agree. I have kept it consisted in my latest commit replacing 'target' with 'destination'



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.getSrcIcebergTable(), this.getExistingTargetIcebergTable());

Review Comment:
   I have added them as method args now... so it will refer whatever gets passed to it.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -307,10 +308,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.srcIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private void addPostPublishStep(List<CopyEntity> copyEntities) {
+    IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.getSrcIcebergTable(), this.getExistingTargetIcebergTable());
+    copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0));

Review Comment:
   I agree! I have changed the method definition as your suggestion above :)



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +105,35 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, Properties properties, FileSystem fs) {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
+    IcebergTable existingTargetIcebergTable = targetIcebergCatalog.openTable(dbName, tblName);
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, existingTargetIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster specific properties
-    Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required");
+        // introducing an optional property for catalogs requiring cluster specific properties
+        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      case TARGET:
+        catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service URI is required");
+        // introducing an optional property for catalogs requiring cluster specific properties
+        Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        break;
+      default:
+        throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
+    }
+    icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+    catalogProperties.put(CatalogProperties.URI, catalogUri);

Review Comment:
   I believe since both src and dst will have the same catalog class key... I have removed source and kept it generic. Now, both src and dst will have the same catalog class key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org