You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/05 00:26:46 UTC
[flink] branch master updated: [FLINK-13066][hive] append
hive-site.xml to path of Hive conf dir
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 761ed77 [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
761ed77 is described below
commit 761ed7728fca566ba29e59aa9313a8d5553cfdb3
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Jul 2 16:42:17 2019 -0700
[FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
This PR fixes a bug that we previously only pass Hive conf dir to HiveConf but we really should pass path of hive-site.xml. Thus, the change is to append hive-site.xml to the Hive conf dir and pass into HiveConf if Hive conf dir is not null.
This closes #8955.
---
.../flink/table/catalog/hive/HiveCatalog.java | 20 +++++++++++-----
.../catalog/hive/factories/HiveCatalogFactory.java | 28 ++--------------------
flink-python/pyflink/table/catalog.py | 7 ++----
3 files changed, 18 insertions(+), 37 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 00e7e69..d83692b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -85,7 +85,8 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.net.URL;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -118,7 +119,7 @@ public class HiveCatalog extends AbstractCatalog {
private HiveMetastoreClientWrapper client;
- public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveConfDir, String hiveVersion) {
+ public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, String hiveVersion) {
this(catalogName,
defaultDatabase == null ? DEFAULT_DB : defaultDatabase,
createHiveConf(hiveConfDir),
@@ -126,7 +127,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@VisibleForTesting
- protected HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion) {
+ protected HiveCatalog(String catalogName, String defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
@@ -136,10 +137,17 @@ public class HiveCatalog extends AbstractCatalog {
LOG.info("Created HiveCatalog '{}'", catalogName);
}
- private static HiveConf createHiveConf(URL hiveConfDir) {
- LOG.info("Setting hive-site location as {}", hiveConfDir);
+ private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
+ LOG.info("Setting hive conf dir as {}", hiveConfDir);
- HiveConf.setHiveSiteLocation(hiveConfDir);
+ try {
+ HiveConf.setHiveSiteLocation(
+ hiveConfDir == null ?
+ null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
+ } catch (MalformedURLException e) {
+ throw new CatalogException(
+ String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
+ }
return new HiveConf();
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
index 67693fd..5b39ef8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
@@ -19,20 +19,15 @@
package org.apache.flink.table.catalog.hive.factories;
import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -82,30 +77,11 @@ public class HiveCatalogFactory implements CatalogFactory {
descriptorProperties.getOptionalString(CATALOG_DEFAULT_DATABASE)
.orElse(HiveCatalog.DEFAULT_DB);
- final Optional<String> hiveSitePath = descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
+ final Optional<String> hiveConfDir = descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
final String version = descriptorProperties.getOptionalString(CATALOG_HIVE_VERSION).orElse(HiveShimLoader.getHiveVersion());
- return new HiveCatalog(name, defaultDatabase, loadHiveConfDir(hiveSitePath.orElse(null)), version);
- }
-
- private static URL loadHiveConfDir(String hiveConfDir) {
-
- URL url = null;
-
- if (!StringUtils.isNullOrWhitespaceOnly(hiveConfDir)) {
- try {
- url = new File(hiveConfDir).toURI().toURL();
-
- LOG.info("Successfully loaded '{}'", hiveConfDir);
-
- } catch (MalformedURLException e) {
- throw new CatalogException(
- String.format("Failed to get hive conf dir from the given path '%s'", hiveConfDir), e);
- }
- }
-
- return url;
+ return new HiveCatalog(name, defaultDatabase, hiveConfDir.orElse(null), version);
}
private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index 6c6480c..081d4c6 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -966,14 +966,11 @@ class HiveCatalog(Catalog):
A catalog implementation for Hive.
"""
- def __init__(self, catalog_name=None, default_database="default", hive_site_path=None,
+ def __init__(self, catalog_name=None, default_database="default", hive_conf_dir=None,
j_hive_catalog=None):
gateway = get_gateway()
if j_hive_catalog is None:
- hive_site_url = gateway.jvm.java.io.File(hive_site_path).toURI().toURL() \
- if hive_site_path is not None else None
-
j_hive_catalog = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
- catalog_name, default_database, hive_site_url)
+ catalog_name, default_database, hive_conf_dir)
super(HiveCatalog, self).__init__(j_hive_catalog)