You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/09/20 20:37:18 UTC

[iceberg] branch master updated: Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bd911c1c25 Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)
bd911c1c25 is described below

commit bd911c1c256665f4ddde07d3c675e25ec3cb6f10
Author: Kunni <38...@users.noreply.github.com>
AuthorDate: Wed Sep 21 04:37:12 2022 +0800

    Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)
---
 docs/flink-getting-started.md                       |  1 +
 .../apache/iceberg/flink/FlinkCatalogFactory.java   | 21 +++++++++++++++++++--
 .../apache/iceberg/flink/FlinkCatalogFactory.java   | 21 +++++++++++++++++++--
 3 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 5b9a414ddd..1ca421e0fc 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -238,6 +238,7 @@ The following properties can be set if using the Hive catalog:
 * `clients`: The Hive metastore client pool size, default value is 2. (Optional)
 * `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
 * `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values. 
 
 ### Hadoop catalog
 
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 1047a5067d..19eda210ae 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -65,6 +65,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
 
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
+  public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
@@ -103,7 +104,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
         // that case it will
         // fallback to parse those values from hadoop configuration which is loaded from classpath.
         String hiveConfDir = properties.get(HIVE_CONF_DIR);
-        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+        String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
         return CatalogLoader.hive(name, newHadoopConf, properties);
 
       case ICEBERG_CATALOG_TYPE_HADOOP:
@@ -147,7 +149,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
   }
 
-  private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+  private static Configuration mergeHiveConf(
+      Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
     Configuration newConf = new Configuration(hadoopConf);
     if (!Strings.isNullOrEmpty(hiveConfDir)) {
       Preconditions.checkState(
@@ -164,6 +167,20 @@ public class FlinkCatalogFactory implements CatalogFactory {
         newConf.addResource(configFile);
       }
     }
+
+    if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+      Preconditions.checkState(
+          Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+          "Failed to load Hadoop configuration: missing %s",
+          Paths.get(hadoopConfDir, "hdfs-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+      Preconditions.checkState(
+          Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+          "Failed to load Hadoop configuration: missing %s",
+          Paths.get(hadoopConfDir, "core-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+    }
+
     return newConf;
   }
 
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 1047a5067d..19eda210ae 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -65,6 +65,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
 
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
+  public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
@@ -103,7 +104,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
         // that case it will
         // fallback to parse those values from hadoop configuration which is loaded from classpath.
         String hiveConfDir = properties.get(HIVE_CONF_DIR);
-        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+        String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
         return CatalogLoader.hive(name, newHadoopConf, properties);
 
       case ICEBERG_CATALOG_TYPE_HADOOP:
@@ -147,7 +149,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
   }
 
-  private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+  private static Configuration mergeHiveConf(
+      Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
     Configuration newConf = new Configuration(hadoopConf);
     if (!Strings.isNullOrEmpty(hiveConfDir)) {
       Preconditions.checkState(
@@ -164,6 +167,20 @@ public class FlinkCatalogFactory implements CatalogFactory {
         newConf.addResource(configFile);
       }
     }
+
+    if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+      Preconditions.checkState(
+          Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+          "Failed to load Hadoop configuration: missing %s",
+          Paths.get(hadoopConfDir, "hdfs-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+      Preconditions.checkState(
+          Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+          "Failed to load Hadoop configuration: missing %s",
+          Paths.get(hadoopConfDir, "core-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+    }
+
     return newConf;
   }