You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by "phet (via GitHub)" <gi...@apache.org> on 2023/04/12 02:31:55 UTC

[GitHub] [gobblin] phet commented on a diff in pull request #3673: [GOBBLIN-1811]Fix Iceberg Registration Serialization

phet commented on code in PR #3673:
URL: https://github.com/apache/gobblin/pull/3673#discussion_r1163489856


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, prefix);
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);
     catalogProperties.put(CatalogProperties.URI, catalogUri);
-    return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+    return catalogProperties;
+  }
+
+  private static String getConfigPrefix(CatalogLocation location) {
+    return ICEBERG_DATASET_PREFIX + "." + location.toString().toLowerCase() + ".";

Review Comment:
   should be a method on the `CatalogLocation` enum



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
    * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */
   protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {
     if (dstMetadata != null) {
-      this.tableOps.commit(srcMetadata, dstMetadata);
+      // commit (baseMetadata -> destination metadata, updatedMetadata -> source metadata)

Review Comment:
   overly cryptic.  maybe say "use current destination metadata as 'base version' and source as update"?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, prefix);
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties properties, String configPrefix) {
+    Map<String, String> catalogProperties = new HashMap<>();
+    Config config = ConfigBuilder.create().loadProps(properties, configPrefix).build();
+    for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
+      catalogProperties.put(entry.getKey(), entry.getValue().unwrapped().toString());
     }
+    String catalogUri = config.getString(ICEBERG_CATALOG_URI_KEY);
+    Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table Service URI is required", configPrefix + ICEBERG_CATALOG_URI_KEY);

Review Comment:
   shouldn't there be a `"."` between `configPrefix` and the URI KEY?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -49,14 +52,9 @@
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {
   public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
-  public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
-  public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
-  public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
-  public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
-  public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+  public static final String ICEBERG_CATALOG_CLASS = "catalog.class";
+  public static final String ICEBERG_CATALOG_URI_KEY = "catalog.uri";

Review Comment:
   please document how these are used... i.e.:
   ```ICEBERG_CATALOG_PREFIX + "." + ("source" or "destination") + "..."```
   be sure to explain how this is an open-ended pattern that allows for passing arbitrary catalog-specific props through to the catalog.
   
   also, given that configs are essentially a global namespace, I'm not convinced that the prefix in use is discriminating enough.  
   
   whereas at present arbitrary properties go after:
   ```
   iceberg.dataset.source
   ```
   which means
   ```
   iceberg.dataset.destination.database
   ```
   would get passed through--and that seems wrong.
   
   instead, it might be more appropriate to put them after
   ```
   iceberg.dataset.source.catalog
   ```
   (or `iceberg.dataset.catalog.source`)
   
   seems most appropriate, given these are *catalog-specific* props



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -316,8 +316,8 @@ protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) {
-    IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+  private PostPublishStep createPostPublishStep(String dbName, String inputTableName, Properties properties) {

Review Comment:
   seems inappropriate for the general case to serialize every single global property here.  can't we constrain only to those actually related to creating the iceberg catalogs, source and dest?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java:
##########
@@ -118,30 +116,27 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice
     return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
-    Map<String, String> catalogProperties = new HashMap<>();
+  protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
+    String prefix = getConfigPrefix(location);
+    Map<String, String> catalogProperties = loadCatalogProperties(properties, prefix);
     Configuration configuration = HadoopUtils.getConfFromProperties(properties);
-    String catalogUri;
-    String icebergCatalogClassName;
-    switch (location) {
-      case SOURCE:
-        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      case DESTINATION:
-        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
-        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
-        // introducing an optional property for catalogs requiring cluster specific properties
-        Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
-        icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
-        break;
-      default:
-        throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
+    String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS, DEFAULT_ICEBERG_CATALOG_CLASS);
+    return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
+  }
+
+  protected static Map<String, String> loadCatalogProperties(Properties properties, String configPrefix) {

Review Comment:
   the semantics here seem non-obvious, hence deserving of javadoc.  the name seems a bit inexact as well.  maybe `buildMapFromPrefixChildren`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org