You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2019/07/05 00:26:46 UTC

[flink] branch master updated: [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 761ed77  [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
761ed77 is described below

commit 761ed7728fca566ba29e59aa9313a8d5553cfdb3
Author: bowen.li <bo...@gmail.com>
AuthorDate: Tue Jul 2 16:42:17 2019 -0700

    [FLINK-13066][hive] append hive-site.xml to path of Hive conf dir
    
    This PR fixes a bug that we previously only pass Hive conf dir to HiveConf but we really should pass path of hive-site.xml. Thus, the change is to append hive-site.xml to the Hive conf dir and pass into HiveConf if Hive conf dir is not null.
    
    This closes #8955.
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 20 +++++++++++-----
 .../catalog/hive/factories/HiveCatalogFactory.java | 28 ++--------------------
 flink-python/pyflink/table/catalog.py              |  7 ++----
 3 files changed, 18 insertions(+), 37 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 00e7e69..d83692b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -85,7 +85,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.net.URL;
+import java.net.MalformedURLException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -118,7 +119,7 @@ public class HiveCatalog extends AbstractCatalog {
 
 	private HiveMetastoreClientWrapper client;
 
-	public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable URL hiveConfDir, String hiveVersion) {
+	public HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir, String hiveVersion) {
 		this(catalogName,
 			defaultDatabase == null ? DEFAULT_DB : defaultDatabase,
 			createHiveConf(hiveConfDir),
@@ -126,7 +127,7 @@ public class HiveCatalog extends AbstractCatalog {
 	}
 
 	@VisibleForTesting
-	protected HiveCatalog(String catalogName, String defaultDatabase, HiveConf hiveConf, String hiveVersion) {
+	protected HiveCatalog(String catalogName, String defaultDatabase, @Nullable HiveConf hiveConf, String hiveVersion) {
 		super(catalogName, defaultDatabase == null ? DEFAULT_DB : defaultDatabase);
 
 		this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf;
@@ -136,10 +137,17 @@ public class HiveCatalog extends AbstractCatalog {
 		LOG.info("Created HiveCatalog '{}'", catalogName);
 	}
 
-	private static HiveConf createHiveConf(URL hiveConfDir) {
-		LOG.info("Setting hive-site location as {}", hiveConfDir);
+	private static HiveConf createHiveConf(@Nullable String hiveConfDir) {
+		LOG.info("Setting hive conf dir as {}", hiveConfDir);
 
-		HiveConf.setHiveSiteLocation(hiveConfDir);
+		try {
+			HiveConf.setHiveSiteLocation(
+				hiveConfDir == null ?
+					null : Paths.get(hiveConfDir, "hive-site.xml").toUri().toURL());
+		} catch (MalformedURLException e) {
+			throw new CatalogException(
+				String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
+		}
 
 		return new HiveConf();
 	}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
index 67693fd..5b39ef8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactory.java
@@ -19,20 +19,15 @@
 package org.apache.flink.table.catalog.hive.factories;
 
 import org.apache.flink.table.catalog.Catalog;
-import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -82,30 +77,11 @@ public class HiveCatalogFactory implements CatalogFactory {
 			descriptorProperties.getOptionalString(CATALOG_DEFAULT_DATABASE)
 				.orElse(HiveCatalog.DEFAULT_DB);
 
-		final Optional<String> hiveSitePath = descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
+		final Optional<String> hiveConfDir = descriptorProperties.getOptionalString(CATALOG_HIVE_CONF_DIR);
 
 		final String version = descriptorProperties.getOptionalString(CATALOG_HIVE_VERSION).orElse(HiveShimLoader.getHiveVersion());
 
-		return new HiveCatalog(name, defaultDatabase, loadHiveConfDir(hiveSitePath.orElse(null)), version);
-	}
-
-	private static URL loadHiveConfDir(String hiveConfDir) {
-
-		URL url = null;
-
-		if (!StringUtils.isNullOrWhitespaceOnly(hiveConfDir)) {
-			try {
-				url = new File(hiveConfDir).toURI().toURL();
-
-				LOG.info("Successfully loaded '{}'", hiveConfDir);
-
-			} catch (MalformedURLException e) {
-				throw new CatalogException(
-					String.format("Failed to get hive conf dir from the given path '%s'", hiveConfDir), e);
-			}
-		}
-
-		return url;
+		return new HiveCatalog(name, defaultDatabase, hiveConfDir.orElse(null), version);
 	}
 
 	private static DescriptorProperties getValidatedProperties(Map<String, String> properties) {
diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py
index 6c6480c..081d4c6 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -966,14 +966,11 @@ class HiveCatalog(Catalog):
     A catalog implementation for Hive.
     """
 
-    def __init__(self, catalog_name=None, default_database="default", hive_site_path=None,
+    def __init__(self, catalog_name=None, default_database="default", hive_conf_dir=None,
                  j_hive_catalog=None):
         gateway = get_gateway()
 
         if j_hive_catalog is None:
-            hive_site_url = gateway.jvm.java.io.File(hive_site_path).toURI().toURL() \
-                if hive_site_path is not None else None
-
             j_hive_catalog = gateway.jvm.org.apache.flink.table.catalog.hive.HiveCatalog(
-                catalog_name, default_database, hive_site_url)
+                catalog_name, default_database, hive_conf_dir)
         super(HiveCatalog, self).__init__(j_hive_catalog)