You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/27 03:05:49 UTC

[flink] 02/03: [FLINK-16647][hive] Hive connector should read mapred-site.xml

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fd9d68f2c6e32894f6ac846f899c466b3f07d972
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Mar 23 11:55:28 2020 +0800

    [FLINK-16647][hive] Hive connector should read mapred-site.xml
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 16 ++++++++++++--
 .../hive/factories/HiveCatalogFactoryTest.java     | 25 ++++++++++++++++------
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 83a5299..fd3bd20 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -70,6 +70,8 @@ import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactory;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -97,6 +99,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -170,8 +173,17 @@ public class HiveCatalog extends AbstractCatalog {
 		}
 
 		// create HiveConf from hadoop configuration
-		return new HiveConf(HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()),
-			HiveConf.class);
+		Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration());
+
+		// Add mapred-site.xml. We need to read configurations like compression codec.
+		for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) {
+			File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");
+			if (mapredSite.exists()) {
+				hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));
+				break;
+			}
+		}
+		return new HiveConf(hadoopConf, HiveConf.class);
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index 1339a94..0db52b4 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -66,18 +66,29 @@ public class HiveCatalogFactoryTest extends TestLogger {
 	}
 
 	@Test
-	public void testLoadHDFSConfigFromEnv() throws IOException {
-		final String k1 = "what is connector?";
-		final String v1 = "Hive";
+	public void testLoadHadoopConfigFromEnv() throws IOException {
+		Map<String, String> customProps = new HashMap<>();
+		String k1 = "what is connector?";
+		String v1 = "Hive";
 		final String catalogName = "HiveCatalog";
 
 		// set HADOOP_CONF_DIR env
 		final File hadoopConfDir = tempFolder.newFolder();
 		final File hdfsSiteFile = new File(hadoopConfDir, "hdfs-site.xml");
 		writeProperty(hdfsSiteFile, k1, v1);
+		customProps.put(k1, v1);
+
+		// add mapred-site file
+		final File mapredSiteFile = new File(hadoopConfDir, "mapred-site.xml");
+		k1 = "mapred.site.config.key";
+		v1 = "mapred.site.config.val";
+		writeProperty(mapredSiteFile, k1, v1);
+		customProps.put(k1, v1);
+
 		final Map<String, String> originalEnv = System.getenv();
 		final Map<String, String> newEnv = new HashMap<>(originalEnv);
 		newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+		newEnv.remove("HADOOP_HOME");
 		CommonTestUtils.setEnv(newEnv);
 
 		// create HiveCatalog use the Hadoop Configuration
@@ -86,14 +97,16 @@ public class HiveCatalogFactoryTest extends TestLogger {
 		final HiveConf hiveConf;
 		try {
 			final HiveCatalog hiveCatalog = (HiveCatalog) TableFactoryService.find(CatalogFactory.class, properties)
-				.createCatalog(catalogName, properties);
+					.createCatalog(catalogName, properties);
 			hiveConf = hiveCatalog.getHiveConf();
 		} finally {
 			// set the Env back
 			CommonTestUtils.setEnv(originalEnv);
 		}
-		//validate the result
-		assertEquals(v1, hiveConf.get(k1, null));
+		// validate the result
+		for (String key : customProps.keySet()) {
+			assertEquals(customProps.get(key), hiveConf.get(key, null));
+		}
 	}
 
 	private static void checkEquals(HiveCatalog c1, HiveCatalog c2) {