You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/10/22 00:23:51 UTC

[GitHub] [iceberg] jackye1995 opened a new pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

jackye1995 opened a new pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640


   As we are having multiple new Catalog implementations added to Iceberg, we need a way to load those Catalogs in Spark and Flink easily. Currently there is a simple switch branch that chooses between `hive` and `hadoop` catalogs. This approach requires the `iceberg-spark` and `iceberg-flink` module to take a dependency on the catalog implementation modules. This would potentially bring in many unnecessary dependencies as more and more cloud providers try to add support for Iceberg.
   
   This PR proposes the following way to load custom Catalog implementations:
   1. the `type` of a custom catalog is always named `custom`
   2. a `impl` property is used to determine the implementation class of the catalog
   3. The catalog has to be initialized only using the Hadoop configuration object, or using a no-arg constructor.
   
   For example, a `GlueCatalog` will be used in Spark like the following:
   
   ```
   spark.sql.catalog.glue = org.apache.iceberg.spark.SparkCatalog
   spark.sql.catalog.glue.type = custom
   spark.sql.catalog.glue.impl = org.apache.iceberg.aws.glue.GlueCatalog
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513714141



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       Oh yes my typo, it's in `iceberg-mr`. So if my understanding is correct, we will have something like:
   
   ```java
   public class MyCatalog implements Catalog, Configurable {
     
     private Configuration conf;
   
     public MyCatalog() {
     }
     
     // need to add a default in Catalog interface
     @Override
     public void initialize(Map<String, String> options) {
         // initialize logic
     }
     
     @Override
     public void setConf(Configuration conf) {
       this.conf = conf;
     }
   }
   
   // in SparkCatalog
   String impl = options.get("impl");
   DynConstructors.Ctor<Catalog> ctor = DynConstructors.builder(Catalog.class)
       .impl(impl) // fall back to no-arg constructor
       .buildChecked();
   Catalog customCatalog = ctor.newInstance();
   if (customCatalog instanceof Configurable) {
     ((Configurable) customCatalog).setConf(conf);
   }
   customCatalog.initialize(options);
   ```
   
   Does this look good to you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r516376977



##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
           }
         });
   }
+
+  /**
+   * Load a custom catalog implementation.
+   * The catalog must have a no-arg constructor.
+   * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+   * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)} is called to set Hadoop configuration.
+   * {@code Catalog.initialize(String name, Map<String, String> options)} is called to complete the initialization.
+   * @param catalogName catalog name
+   * @param impl catalog implementation full class name
+   * @param engineOptions configuration options from a compute engine like Spark or Flink to initialize the catalog
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error during initialization
+   */
+  public static Catalog loadCustomCatalog(
+      String catalogName,
+      String impl,

Review comment:
       Nit: I would expect `impl` to come first to keep the configuration (name, options) together.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
           }
         });
   }
+
+  /**
+   * Load a custom catalog implementation.
+   * The catalog must have a no-arg constructor.
+   * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+   * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)} is called to set Hadoop configuration.
+   * {@code Catalog.initialize(String name, Map<String, String> options)} is called to complete the initialization.
+   * @param catalogName catalog name
+   * @param impl catalog implementation full class name
+   * @param engineOptions configuration options from a compute engine like Spark or Flink to initialize the catalog
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error during initialization
+   */
+  public static Catalog loadCustomCatalog(
+      String catalogName,
+      String impl,
+      Map<String, String> engineOptions,
+      Configuration hadoopConf) {
+    Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog because impl property is not set");
+    DynConstructors.Ctor<Catalog> ctor;
+    try {
+      ctor = DynConstructors.builder(Catalog.class)
+          .impl(impl)
+          .buildChecked();

Review comment:
       This may all fit on one line now.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
           }
         });
   }
+
+  /**
+   * Load a custom catalog implementation.
+   * The catalog must have a no-arg constructor.
+   * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+   * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)} is called to set Hadoop configuration.
+   * {@code Catalog.initialize(String name, Map<String, String> options)} is called to complete the initialization.
+   * @param catalogName catalog name
+   * @param impl catalog implementation full class name
+   * @param engineOptions configuration options from a compute engine like Spark or Flink to initialize the catalog
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error during initialization
+   */
+  public static Catalog loadCustomCatalog(
+      String catalogName,
+      String impl,
+      Map<String, String> engineOptions,
+      Configuration hadoopConf) {
+    Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog because impl property is not set");
+    DynConstructors.Ctor<Catalog> ctor;
+    try {
+      ctor = DynConstructors.builder(Catalog.class)
+          .impl(impl)
+          .buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(String.format(
+          "Cannot initialize Catalog, please make sure %s has a no-arg constructor", impl), e);
+    }
+    try {
+      Catalog catalog = ctor.newInstance();
+      if (catalog instanceof Configurable) {
+        ((Configurable) catalog).setConf(hadoopConf);
+      }
+      catalog.initialize(catalogName, engineOptions);

Review comment:
       Can we move configuration out of the try/catch block? It doesn't need to be there.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
           }
         });
   }
+
+  /**
+   * Load a custom catalog implementation.
+   * The catalog must have a no-arg constructor.
+   * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+   * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)} is called to set Hadoop configuration.
+   * {@code Catalog.initialize(String name, Map<String, String> options)} is called to complete the initialization.
+   * @param catalogName catalog name
+   * @param impl catalog implementation full class name
+   * @param engineOptions configuration options from a compute engine like Spark or Flink to initialize the catalog
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error during initialization
+   */
+  public static Catalog loadCustomCatalog(
+      String catalogName,
+      String impl,
+      Map<String, String> engineOptions,

Review comment:
       The name `engineOptions` seems too specific because it assumes that the caller is an engine. But it could be a user of the API that isn't an engine. I think `config` or `properties` would be a better name.

##########
File path: core/src/main/java/org/apache/iceberg/CatalogUtil.java
##########
@@ -117,4 +122,45 @@ private static void deleteFiles(FileIO io, Set<ManifestFile> allManifests) {
           }
         });
   }
+
+  /**
+   * Load a custom catalog implementation.
+   * The catalog must have a no-arg constructor.
+   * If the catalog implements {@link org.apache.hadoop.conf.Configurable},
+   * {@code Configurable.setConf(org.apache.hadoop.conf.Configuration conf)} is called to set Hadoop configuration.
+   * {@code Catalog.initialize(String name, Map<String, String> options)} is called to complete the initialization.
+   * @param catalogName catalog name
+   * @param impl catalog implementation full class name
+   * @param engineOptions configuration options from a compute engine like Spark or Flink to initialize the catalog
+   * @param hadoopConf hadoop configuration if needed
+   * @return initialized catalog object
+   * @throws IllegalArgumentException if no-arg constructor not found or error during initialization
+   */
+  public static Catalog loadCustomCatalog(

Review comment:
       Minor: I tend to opt for removing words that aren't needed, so I would remove "custom" from here. I think that's pretty much implied by loading an implementation class.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -58,13 +59,19 @@
 
   // Can not just use "type", it conflicts with CATALOG_TYPE.
   public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+  public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";

Review comment:
       Instead of using `type=custom` and `impl=com.example.Catalog`, why not just combine them into `type=com.example.Catalog`. We can try to load the type as an implementation class if it isn't a well-known name like "hive".

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
##########
@@ -137,9 +138,11 @@ protected Table findTable(DataSourceOptions options, Configuration conf) {
       HadoopTables tables = new HadoopTables(conf);
       return tables.load(path.get());
     } else {
-      HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf);
+      Catalog catalog = options.get("impl")
+          .map(impl -> CatalogUtil.loadCustomCatalog("custom", impl, options.asMap(), conf))
+          .orElseGet(() -> HiveCatalogs.loadCatalog(conf));
       TableIdentifier tableIdentifier = TableIdentifier.parse(path.get());
-      return hiveCatalog.loadTable(tableIdentifier);
+      return catalog.loadTable(tableIdentifier);

Review comment:
       I think we shouldn't change the behavior of `IcebergSource` in this PR. We want to change how this source works and route queries through a catalog, but I'm not sure that using `impl` is the right way to do it. Let's stick with `HiveCatalogs` for now and revisit this in a follow up.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -58,13 +59,19 @@
 
   // Can not just use "type", it conflicts with CATALOG_TYPE.
   public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+  public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";

Review comment:
       @aokolnychyi and @RussellSpitzer, do you have an opinion here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513117643



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       We definitely need an option to create a catalog without passing `Configuration` but still passing config properties. I originally thought that it would make sense to use another constructor, but then I thought about how `name` is passed... and I think the number of possible constructors may get out of hand.
   
   Instead of adding a lot of constructors, I think we should do this:
   1. Use a no-arg constructor for all catalogs
   2. Add an `initialize` method to catalogs that is called to pass the catalog name and a string map of config (this matches what Spark does)
   3. If the catalog implements Hadoop's `Configurable` interface, also call `setConf` to set the Hadoop config.
   
   That way, we avoid having `Configuration` in any of our APIs and minimize the number of constructors that we need to support. What do you think, @jackye1995?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] giovannifumarola commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
giovannifumarola commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r510453213



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -79,6 +80,10 @@ protected CatalogLoader createCatalogLoader(String name, Map<String, String> pro
         String warehouseLocation = properties.get(HADOOP_WAREHOUSE_LOCATION);
         return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation);
 
+      case "custom":

Review comment:
       NIT: Avoid hard code it

##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -94,4 +100,47 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(String name, Configuration conf, String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {

Review comment:
       Maybe this method can be static and be called from other places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-722001888


   @rdblue @jacques-n any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513116680



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -94,4 +100,47 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(String name, Configuration conf, String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {

Review comment:
       What about using a static method in some other class in core? Maybe `CatalogUtil.loadCatalog`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513680602



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       > I don't think it is a good idea to pass a `Configuration` as a flattened map. Then it isn't possible to reconstruct the original `Configuration` without everything being an override. I think we should keep `Configuration` separate and discourage its use where it isn't needed for Hadoop classes used by Iceberg.
   
   yeah I replied at the same time lol. Please take a look at the latest reply




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-717608889


   fix rebase issue, reopen PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rymurr commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rymurr commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-717194528


   
   > I think we also want to pass options from the catalog config in Flink and Spark, where users can pass properties like `uri` and `warehouse`. Could you add a `Map` to this to pass the catalog config options?
   
   I like the Map over Configuration suggestion as well. In #1587 I made the constructor take `String name, Map props, Configuration conf` as it still needs a `HadoopFileIO`
   
   Has anyone thought of how to do this for the `IcebergSource`? Currently `df.write().format("iceberg")` is as far as I understand going to use Hive/HDFS regardless of these settings.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513691007



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       I don't think that we want to use `CatalogLoader`. The use case for that is a bit different: it is for Hive, where `Configuration` is the _correct_ way to pass options. That loader should call whatever dynamic loader function we introduce in this PR. I also think that it shouldn't be located in the iceberg-hive-metastore module. That should be in iceberg-mr.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jacques-n commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jacques-n commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-721422648


   I was suggesting using the narrower function instead map here as well.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jacques-n commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jacques-n commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-721424922


   Just a lot extra methods exposed. Iteration, put, etc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-721421412


   > I meant to make a comment before on this. We used Function<String, String> in Nessie for this and it feels like a narrower interface for this purpose. See here: https://github.com/projectnessie/nessie/blob/main/clients/client/src/main/java/com/dremio/nessie/client/NessieClient.java#L161
   
   Interesting, but it should still be compatible by using `Map<String, String>::get`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 closed pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] giovannifumarola commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
giovannifumarola commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-714761429


   Thanks Jack. I like the following approach.
   spark.sql.catalog.glue = org.apache.iceberg.spark.SparkCatalog
   spark.sql.catalog.glue.type = custom
   spark.sql.catalog.glue.impl = org.apache.iceberg.aws.glue.GlueCatalog
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513121949



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       Yes I am also thinking about this issue. Another way I am considering is to use a constructor that only takes a string map. Becasue Hadoop configuration implements the `Iterable<Map.Entry<String, String>>` interface, we can merge Spark or Flink properties with Hadoop configurations together and pass into the constructor in a single map. We can use a wrapper class for the merged map to ensure the names of those properties and config keys do not conflict.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513657641



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       I don't think it is a good idea to pass a `Configuration` as a flattened map. Then it isn't possible to reconstruct the original `Configuration` without everything being an override. I think we should keep `Configuration` separate and discourage its use where it isn't needed for Hadoop classes used by Iceberg.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r510498951



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -79,6 +80,10 @@ protected CatalogLoader createCatalogLoader(String name, Map<String, String> pro
         String warehouseLocation = properties.get(HADOOP_WAREHOUSE_LOCATION);
         return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation);
 
+      case "custom":

Review comment:
       This is trying to be consistent with the other cases which also use hard-coded strings, both in Spark and Flink.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] edgarRd commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
edgarRd commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r511051428



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -79,6 +80,10 @@ protected CatalogLoader createCatalogLoader(String name, Map<String, String> pro
         String warehouseLocation = properties.get(HADOOP_WAREHOUSE_LOCATION);
         return CatalogLoader.hadoop(name, hadoopConf, warehouseLocation);
 
+      case "custom":

Review comment:
       Maybe we should make them an `Enum`, which in general are preferred for using in `switch` statements. Also, I'm not sure if we make sure properties are lower case for this switch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513115741



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       Can we add an `impl` for just `Map`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-715546468


   My main concern with this is how configuration is currently passed using Hadoop `Configuration`. We want to avoid using `Configuration` and only pass it where it is needed for our use of Hadoop APIs, like `FileSystem`. Passing `Configuration` as the last argument is fine because many catalogs will currently need it to use `HadoopFileIO`, but we'll need an alternative as well.
   
   I think we also want to pass options from the catalog config in Flink and Spark, where users can pass properties like `uri` and `warehouse`. Could you add a `Map` to this to pass the catalog config options?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jacques-n commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jacques-n commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-722007184


   I'm fine with it as is. 
   
   Some background: I'm not especially supportive of maps in interfaces since they are so broad. I've had bad experiences in the past where we've had to move maps to external storage (say DynamoDB) and having a broad use of the Map interface hurt us in reworking things.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-721423980


   What's the drawback of using a map? It is a bit narrower to use the function, but it is also limiting. Catalogs can't use `getOrDefault` or check `contains`. I also don't have a serious problem if we need to copy to create an Immutable map for this because it isn't in a performance-critical path.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513835978



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -94,4 +100,47 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(String name, Configuration conf, String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {

Review comment:
       Yeah sounds like a good idea to move the logic to a common util, let me do that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-722007621


   I think it is okay to use a map here. Like I said, even if we had to copy values to create a map to pass in here, that happens once per session and is not an unreasonable amount of overhead.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-720862186


   @jackye1995, I'm happy with the implementation and behavior (other than changing `IcebergSource` later). The main things to fix are names and how we configure it. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-721514228


   > Just a lot extra methods exposed. Iteration, put, etc.
   
   I think the advantage of using a `Map` is that:
   1. consistent with the `TableProperties` interface
   2. consistent with the Spark and Flink properties interface
   3. as Ryan says, there are some useful methods like `getOrDefault` and `contains`, and iteration might still be needed for some use cases such as getting all configs of a certain prefix.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#issuecomment-717448365


   @rymurr, for your `IcebergSource` question: the source can implement a trait to return the catalog and identifier to load instead of returning a table itself. There are two reasons we don't do this already:
   
   1. The catalog must be defined in Spark properties, so we either need to have a "default" catalog or have a way to configure a catalog for the source
   2. If we use a catalog, then we still need to support path-based tables. We will need to add a way to pass a path as an identifier to the catalog and have it load using `HadoopTables`
   
   Figuring out how we want to do this shouldn't be too difficult. We just found it easier to keep the existing behavior for the last release since there weren't other catalogs at the time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r510498702



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -94,4 +100,47 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(String name, Configuration conf, String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {

Review comment:
       It implements the `CatalogLoader` interface and cannot be changed to static.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r516414877



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -58,13 +59,19 @@
 
   // Can not just use "type", it conflicts with CATALOG_TYPE.
   public static final String ICEBERG_CATALOG_TYPE = "catalog-type";
+  public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
+  public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
+  public static final String ICEBERG_CATALOG_TYPE_CUSTOM = "custom";

Review comment:
       Yeah this sounds like a cleaner way to go, the only disadvantage is that we are overloading the term `type`. I have updated the code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #1640: Allow loading custom Catalog implementation in Spark and Flink

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #1640:
URL: https://github.com/apache/iceberg/pull/1640#discussion_r513657785



##########
File path: flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -105,4 +113,54 @@ public String toString() {
           .toString();
     }
   }
+
+  class CustomCatalogLoader implements CatalogLoader {
+
+    private final SerializableConfiguration hadoopConf;
+    private final Map<String, String> properties;
+    private final String name;
+    private final String impl;
+
+    private CustomCatalogLoader(
+        String name,
+        Map<String, String> properties,
+        Configuration conf,
+        String impl) {
+      this.hadoopConf = new SerializableConfiguration(conf);
+      this.properties = new HashMap<>(properties); // use hashmap for serialization
+      this.name = name;
+      this.impl = Preconditions.checkNotNull(impl,
+          "Cannot initialize custom Catalog because impl property is not set");
+    }
+
+    @Override
+    public Catalog loadCatalog() {
+      DynConstructors.Ctor<Catalog> ctor;
+      try {
+        ctor = DynConstructors.builder(Catalog.class)
+            .impl(impl, Map.class, Configuration.class) // take in flink properties and hadoop configs
+            .impl(impl) // fall back to no-arg constructor

Review comment:
       Actually, ignore the last comment, I overlooked the fact that the HadoopIO still needs the Hadoop configuration object. So the `iceberg-hive-metastore` already provides a way to solve this, that is to dynamically construct a `CatalogLoader` object, and call `loader.load(conf)`. ([link](https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java#L198))
   
   This provides maximum flexibility, and allow each engine to have a different loader interface that takes different arguments in constructor. Flink already has one, and I can add another one for Spark.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org