You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/31 07:36:38 UTC

[flink] branch release-1.9 updated: [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f057286  [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn
f057286 is described below

commit f057286da1ae454cda2ee820e171aad936c294bc
Author: Hongtao Zhang <55...@qq.com>
AuthorDate: Fri Jul 26 19:08:24 2019 +0800

    [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn
    
    This closes #9237
    
    (cherry picked from commit a9d393167cc3927f55b196baa918deea29695225)
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 10 +++-
 .../hive/factories/HiveCatalogFactoryTest.java     | 55 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 3b4218d..19ffa0d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.batch.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableSchema;
@@ -150,7 +151,14 @@ public class HiveCatalog extends AbstractCatalog {
 				String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
 		}
 
-		return new HiveConf();
+		// create HiveConf from hadoop configuration
+		return new HiveConf(HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()),
+			HiveConf.class);
+	}
+
+	@VisibleForTesting
+	public HiveConf getHiveConf() {
+		return hiveConf;
 	}
 
 	@VisibleForTesting
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index feaa868..1339a94 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.factories;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
@@ -27,8 +28,16 @@ import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -37,6 +46,8 @@ import static org.junit.Assert.assertEquals;
  * Test for {@link HiveCatalog} created by {@link HiveCatalogFactory}.
  */
 public class HiveCatalogFactoryTest extends TestLogger {
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@Test
 	public void test() {
@@ -54,9 +65,53 @@ public class HiveCatalogFactoryTest extends TestLogger {
 		checkEquals(expectedCatalog, (HiveCatalog) actualCatalog);
 	}
 
+	@Test
+	public void testLoadHDFSConfigFromEnv() throws IOException {
+		final String k1 = "what is connector?";
+		final String v1 = "Hive";
+		final String catalogName = "HiveCatalog";
+
+		// set HADOOP_CONF_DIR env
+		final File hadoopConfDir = tempFolder.newFolder();
+		final File hdfsSiteFile = new File(hadoopConfDir, "hdfs-site.xml");
+		writeProperty(hdfsSiteFile, k1, v1);
+		final Map<String, String> originalEnv = System.getenv();
+		final Map<String, String> newEnv = new HashMap<>(originalEnv);
+		newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+		CommonTestUtils.setEnv(newEnv);
+
+		// create HiveCatalog use the Hadoop Configuration
+		final CatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor();
+		final Map<String, String> properties = catalogDescriptor.toProperties();
+		final HiveConf hiveConf;
+		try {
+			final HiveCatalog hiveCatalog = (HiveCatalog) TableFactoryService.find(CatalogFactory.class, properties)
+				.createCatalog(catalogName, properties);
+			hiveConf = hiveCatalog.getHiveConf();
+		} finally {
+			// set the Env back
+			CommonTestUtils.setEnv(originalEnv);
+		}
+		//validate the result
+		assertEquals(v1, hiveConf.get(k1, null));
+	}
+
 	private static void checkEquals(HiveCatalog c1, HiveCatalog c2) {
 		// Only assert a few selected properties for now
 		assertEquals(c1.getName(), c2.getName());
 		assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
 	}
+
+	private static void writeProperty(File file, String key, String value) throws IOException {
+		try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+			out.println("<?xml version=\"1.0\"?>");
+			out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+			out.println("<configuration>");
+			out.println("\t<property>");
+			out.println("\t\t<name>" + key + "</name>");
+			out.println("\t\t<value>" + value + "</value>");
+			out.println("\t</property>");
+			out.println("</configuration>");
+		}
+	}
 }