You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/18 16:03:50 UTC

[iceberg] branch master updated: Flink: Support hadoop-conf-dir for hdfs-site.xml and core-site.xml (#4622)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f56e01773 Flink: Support hadoop-conf-dir for hdfs-site.xml and core-site.xml (#4622)
f56e01773 is described below

commit f56e017738668b08a4a8b1aafcc5256b37b384eb
Author: fallnirvana <90...@qq.com>
AuthorDate: Thu May 19 00:03:44 2022 +0800

    Flink: Support hadoop-conf-dir for hdfs-site.xml and core-site.xml (#4622)
---
 .../org/apache/iceberg/flink/FlinkCatalogFactory.java     | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index ffa54c0eb..12a3dc6b9 100644
--- a/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -64,6 +64,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
 
   public static final String HIVE_CONF_DIR = "hive-conf-dir";
+  public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
@@ -96,7 +97,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
         // The values of properties 'uri', 'warehouse', 'hive-conf-dir' are allowed to be null, in that case it will
         // fallback to parse those values from hadoop configuration which is loaded from classpath.
         String hiveConfDir = properties.get(HIVE_CONF_DIR);
-        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+        String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+        Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
         return CatalogLoader.hive(name, newHadoopConf, properties);
 
       case ICEBERG_CATALOG_TYPE_HADOOP:
@@ -139,7 +141,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
     return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
   }
 
-  private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+  private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
     Configuration newConf = new Configuration(hadoopConf);
     if (!Strings.isNullOrEmpty(hiveConfDir)) {
       Preconditions.checkState(Files.exists(Paths.get(hiveConfDir, "hive-site.xml")),
@@ -153,6 +155,15 @@ public class FlinkCatalogFactory implements CatalogFactory {
         newConf.addResource(configFile);
       }
     }
+
+    if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+      Preconditions.checkState(Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+          "Failed to load Hadoop configuration: missing %s", Paths.get(hadoopConfDir, "hdfs-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+      Preconditions.checkState(Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+          "Failed to load Hadoop configuration: missing %s", Paths.get(hadoopConfDir, "core-site.xml"));
+      newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+    }
     return newConf;
   }