You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/09/20 20:37:18 UTC
[iceberg] branch master updated: Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bd911c1c25 Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)
bd911c1c25 is described below
commit bd911c1c256665f4ddde07d3c675e25ec3cb6f10
Author: Kunni <38...@users.noreply.github.com>
AuthorDate: Wed Sep 21 04:37:12 2022 +0800
Flink 1.14&1.15 backport: Set custom Hadoop configuration (#5775)
---
docs/flink-getting-started.md | 1 +
.../apache/iceberg/flink/FlinkCatalogFactory.java | 21 +++++++++++++++++++--
.../apache/iceberg/flink/FlinkCatalogFactory.java | 21 +++++++++++++++++++--
3 files changed, 39 insertions(+), 4 deletions(-)
diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 5b9a414ddd..1ca421e0fc 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -238,6 +238,7 @@ The following properties can be set if using the Hive catalog:
* `clients`: The Hive metastore client pool size, default value is 2. (Optional)
* `warehouse`: The Hive warehouse location, users should specify this path if neither set the `hive-conf-dir` to specify a location containing a `hive-site.xml` configuration file nor add a correct `hive-site.xml` to classpath.
* `hive-conf-dir`: Path to a directory containing a `hive-site.xml` configuration file which will be used to provide custom Hive configuration values. The value of `hive.metastore.warehouse.dir` from `<hive-conf-dir>/hive-site.xml` (or hive configure file from classpath) will be overwrote with the `warehouse` value if setting both `hive-conf-dir` and `warehouse` when creating iceberg catalog.
+* `hadoop-conf-dir`: Path to a directory containing `core-site.xml` and `hdfs-site.xml` configuration files which will be used to provide custom Hadoop configuration values.
### Hadoop catalog
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 1047a5067d..19eda210ae 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -65,6 +65,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String BASE_NAMESPACE = "base-namespace";
@@ -103,7 +104,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
// that case it will
// fallback to parse those values from hadoop configuration which is loaded from classpath.
String hiveConfDir = properties.get(HIVE_CONF_DIR);
- Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+ String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
return CatalogLoader.hive(name, newHadoopConf, properties);
case ICEBERG_CATALOG_TYPE_HADOOP:
@@ -147,7 +149,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
}
- private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+ private static Configuration mergeHiveConf(
+ Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
Configuration newConf = new Configuration(hadoopConf);
if (!Strings.isNullOrEmpty(hiveConfDir)) {
Preconditions.checkState(
@@ -164,6 +167,20 @@ public class FlinkCatalogFactory implements CatalogFactory {
newConf.addResource(configFile);
}
}
+
+ if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "hdfs-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "core-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+ }
+
return newConf;
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 1047a5067d..19eda210ae 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -65,6 +65,7 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
public static final String HIVE_CONF_DIR = "hive-conf-dir";
+ public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String BASE_NAMESPACE = "base-namespace";
@@ -103,7 +104,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
// that case it will
// fallback to parse those values from hadoop configuration which is loaded from classpath.
String hiveConfDir = properties.get(HIVE_CONF_DIR);
- Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir);
+ String hadoopConfDir = properties.get(HADOOP_CONF_DIR);
+ Configuration newHadoopConf = mergeHiveConf(hadoopConf, hiveConfDir, hadoopConfDir);
return CatalogLoader.hive(name, newHadoopConf, properties);
case ICEBERG_CATALOG_TYPE_HADOOP:
@@ -147,7 +149,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
}
- private static Configuration mergeHiveConf(Configuration hadoopConf, String hiveConfDir) {
+ private static Configuration mergeHiveConf(
+ Configuration hadoopConf, String hiveConfDir, String hadoopConfDir) {
Configuration newConf = new Configuration(hadoopConf);
if (!Strings.isNullOrEmpty(hiveConfDir)) {
Preconditions.checkState(
@@ -164,6 +167,20 @@ public class FlinkCatalogFactory implements CatalogFactory {
newConf.addResource(configFile);
}
}
+
+ if (!Strings.isNullOrEmpty(hadoopConfDir)) {
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "hdfs-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "hdfs-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "hdfs-site.xml"));
+ Preconditions.checkState(
+ Files.exists(Paths.get(hadoopConfDir, "core-site.xml")),
+ "Failed to load Hadoop configuration: missing %s",
+ Paths.get(hadoopConfDir, "core-site.xml"));
+ newConf.addResource(new Path(hadoopConfDir, "core-site.xml"));
+ }
+
return newConf;
}