You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/02/22 00:53:18 UTC

[GitHub] [iceberg] kainoa21 opened a new pull request #4183: Flink: Implement catalog Factory interface (#3117)

kainoa21 opened a new pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183


   Implements the catalog Factory interface while also attempting to decouple catalog loading from requiring hadoop dependencies (see [#3117](https://github.com/apache/iceberg/issues/3117)).
   
   For now this attempts to leave all of the existing catalog creation interfaces in tact but swaps out `Configuration` for `Object` as a follow on the previous work in [#3590](https://github.com/apache/iceberg/pull/3590).  If we end up going this direction, some additional work will be necessary to ensure no loss of functionality for the existing Hadoop and Hive catalog implementations when using the new interface.
   
   While testing these changes locally, I was able to load a non-hadoop based catalog in a Flink SQL environment without any hadoop jars.  Though I did still eventually hit an error while attempting to read parquet files due to parquet's [hadoop dependency](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java#L174).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811503577



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -155,4 +210,55 @@ public String toString() {
     }
   }
 
+  class BaseCatalogLoader implements CatalogLoader {
+
+    private final Map<String, String> options;
+    private final String catalogName;
+    private final SerializableConfiguration hadoopConf;
+
+    private BaseCatalogLoader(
+            CatalogFactory.Context context
+    ) {
+      this.options = Maps.newHashMap(context.getOptions()); // wrap into a hashmap for serialization
+      this.catalogName = context.getName();
+      // Check to see if there are hadoop classes loaded in Flink's application classloader
+      if (isHadoopEnv(context.getClassLoader().getParent())) {

Review comment:
       The intention here is to only check Flink's application-level class loader (i.e. jars from flink's `/lib` folder).  This is important because the call to retrieve the hadoop conf from the flink environment (`HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())`) does not have access to user level classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r812071336



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -44,17 +46,27 @@
    *
    * @return a newly created {@link Catalog}
    */
-  Catalog loadCatalog();
+  String getName();
+  Map<String, String> getOptions();
+  Object getConf();

Review comment:
       IMO there are two independent questions we want to answer at runtime.
   
   1) Can the runtime environment provide Hadoop configuration?
   
   2) Does the catalog accept Hadoop configuration?
   
   The existing Configurable interface addresses the second question.  But we still need Flink to answer the first one.  In an ideal case, Flink would directly provide an answer via the Context param somehow.
   
   I also don't think adding getConf to the Iceberg Configurable interface really gets us anywhere.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811503577



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -155,4 +210,55 @@ public String toString() {
     }
   }
 
+  class BaseCatalogLoader implements CatalogLoader {
+
+    private final Map<String, String> options;
+    private final String catalogName;
+    private final SerializableConfiguration hadoopConf;
+
+    private BaseCatalogLoader(
+            CatalogFactory.Context context
+    ) {
+      this.options = Maps.newHashMap(context.getOptions()); // wrap into a hashmap for serialization
+      this.catalogName = context.getName();
+      // Check to see if there are hadoop classes loaded in Flink's application classloader
+      if (isHadoopEnv(context.getClassLoader().getParent())) {

Review comment:
       The intention here is to only check Flink's application-level class loader (i.e. jars from flink's `/lib` folder).  This is important because the call retrieve the hadoop conf from the flink environment (`HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())`) does not have access to user level classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811501438



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -21,10 +21,12 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.hadoop.conf.Configuration;

Review comment:
       We can eventually if we want to strip out references from the existing Hadoop and Hive catalog implementations.  As it stands, the `Configuration` class is only loaded if a catalog ends up calling `HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())` which is avoided for catalogs which are loaded by passing `catalog-impl` AND when `org.apache.hadoop.conf.Configuration` isn't available to Flink's application level class loader (even it it is available to the user lib classloader).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811525178



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -155,4 +210,55 @@ public String toString() {
     }
   }
 
+  class BaseCatalogLoader implements CatalogLoader {
+
+    private final Map<String, String> options;
+    private final String catalogName;
+    private final SerializableConfiguration hadoopConf;
+
+    private BaseCatalogLoader(
+            CatalogFactory.Context context
+    ) {
+      this.options = Maps.newHashMap(context.getOptions()); // wrap into a hashmap for serialization
+      this.catalogName = context.getName();
+      // Check to see if there are hadoop classes loaded in Flink's application classloader
+      if (isHadoopEnv(context.getClassLoader().getParent())) {

Review comment:
       We also have an interface marker, `org.apache.iceberg.hadoop.Configurable<C>`, which is used for a similar purpose (to avoid the dependency on hadoop).
   
   It can be mixed in and then there's a marker you can check possibly. Here's an exmaple usage of it in `GlueCatalog`: https://github.com/apache/iceberg/blob/8afcdffd905fa284f20f6f84fba649d28fb7d923/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L76-L82




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] danielcweeks commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
danielcweeks commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811500407



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -21,10 +21,12 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.hadoop.conf.Configuration;

Review comment:
       Looks like we need to remove the import.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811526006



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -44,17 +46,27 @@
    *
    * @return a newly created {@link Catalog}
    */
-  Catalog loadCatalog();
+  String getName();
+  Map<String, String> getOptions();
+  Object getConf();

Review comment:
       The interface for `setConf` that avoids needing hadoop is found here: https://github.com/apache/iceberg/blob/da84a9c0740c9909889780b4350e113cfd35a4ae/core/src/main/java/org/apache/iceberg/hadoop/Configurable.java#L22-L27
   
   We might just want to add `getConf` to that as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811502076



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
##########
@@ -126,8 +129,43 @@ public Catalog createCatalog(String name, Map<String, String> properties) {
     return createCatalog(name, properties, clusterHadoopConf());
   }
 
-  protected Catalog createCatalog(String name, Map<String, String> properties, Configuration hadoopConf) {
-    CatalogLoader catalogLoader = createCatalogLoader(name, properties, hadoopConf);
+  @Override
+  public String factoryIdentifier() {
+    return FACTORY_IDENTIFIER;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {

Review comment:
       Not sure what we would want to return for required or optional options?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kainoa21 commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kainoa21 commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811501438



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -21,10 +21,12 @@
 
 import java.io.Serializable;
 import java.util.Map;
+import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.hadoop.conf.Configuration;

Review comment:
       We can eventually if we want to strip out references from the existing Hadoop and Hive catalog implementations.  As it stands, the `Configuration` class is only loaded if a catalog ends up calling `HadoopUtils.getHadoopConfiguration(GlobalConfiguration.loadConfiguration())` which is avoided for catalogs which are loaded by passing `catalog-impl` AND when `org.apache.hadoop.conf.Configuration` isn't available to Flink's application level class loader (even if it is available to the user lib classloader).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #4183: Flink: Implement catalog Factory interface (#3117)

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #4183:
URL: https://github.com/apache/iceberg/pull/4183#discussion_r811525178



##########
File path: flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/CatalogLoader.java
##########
@@ -155,4 +210,55 @@ public String toString() {
     }
   }
 
+  class BaseCatalogLoader implements CatalogLoader {
+
+    private final Map<String, String> options;
+    private final String catalogName;
+    private final SerializableConfiguration hadoopConf;
+
+    private BaseCatalogLoader(
+            CatalogFactory.Context context
+    ) {
+      this.options = Maps.newHashMap(context.getOptions()); // wrap into a hashmap for serialization
+      this.catalogName = context.getName();
+      // Check to see if there are hadoop classes loaded in Flink's application classloader
+      if (isHadoopEnv(context.getClassLoader().getParent())) {

Review comment:
       As I mentioned above, we also have an interface marker, `org.apache.iceberg.hadoop.Configurable<C>`, which is used for a similar purpose (to avoid the dependency on hadoop).
   
   Here's an exmaple usage of it in `GlueCatalog`, though I'm not 100% sure it can stand in for this usecase here (but maybe we update it so it can): https://github.com/apache/iceberg/blob/8afcdffd905fa284f20f6f84fba649d28fb7d923/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L76-L82




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org