You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "Will-Lo (via GitHub)" <gi...@apache.org> on 2023/03/31 05:34:02 UTC

[GitHub] [gobblin] Will-Lo commented on a diff in pull request #3663: [GOBBLIN-1802]Register iceberg table metadata update with destination side catalog

Will-Lo commented on code in PR #3663:
URL: https://github.com/apache/gobblin/pull/3663#discussion_r1154026198


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
+    Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
+    Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster specific properties
-    Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required");
+        // introducing an optional property for catalogs requiring cluster specific properties
+        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required");

Review Comment:
   In error messages like these, can you include the explicit key in the precondition message? So that when this error occurs it is clear what the user/developer needs to add.



##########
gradle/scripts/dependencyDefinitions.gradle:
##########
@@ -123,7 +123,7 @@ ext.externalDependency = [
     "guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0",
     "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
     "derby": "org.apache.derby:derby:10.12.1.1",
-    "mockito": "org.mockito:mockito-core:4.11.0",
+    "mockito": "org.mockito:mockito-inline:4.11.0",

Review Comment:
   Can you provide justification in a comment why we're using inline mockito instead of mockito core? From what I read mockito inline is for experimental features.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
+    Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
+    Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster specific properties
-    Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required");
+        // introducing an optional property for catalogs requiring cluster specific properties
+        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required");

Review Comment:
   And same thing for source table key missing as well



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -98,20 +104,43 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before initiating replication

Review Comment:
   Should we actually enforce this semantic? It differs from Hive distcp, which creates the table if the table is missing on the destination



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org