You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/31 07:36:38 UTC
[flink] branch release-1.9 updated: [FLINK-13431][hive] Fix
nameNode HA configuration was not loaded when running HiveConnector on Yarn
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new f057286 [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn
f057286 is described below
commit f057286da1ae454cda2ee820e171aad936c294bc
Author: Hongtao Zhang <55...@qq.com>
AuthorDate: Fri Jul 26 19:08:24 2019 +0800
[FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn
This closes #9237
(cherry picked from commit a9d393167cc3927f55b196baa918deea29695225)
---
.../flink/table/catalog/hive/HiveCatalog.java | 10 +++-
.../hive/factories/HiveCatalogFactoryTest.java | 55 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 3b4218d..19ffa0d 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.batch.connectors.hive.HiveTableFactory;
import org.apache.flink.table.api.TableSchema;
@@ -150,7 +151,14 @@ public class HiveCatalog extends AbstractCatalog {
String.format("Failed to get hive-site.xml from %s", hiveConfDir), e);
}
- return new HiveConf();
+ // create HiveConf from hadoop configuration
+ return new HiveConf(HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()),
+ HiveConf.class);
+ }
+
+ @VisibleForTesting
+ public HiveConf getHiveConf() {
+ return hiveConf;
}
@VisibleForTesting
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index feaa868..1339a94 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.catalog.hive.factories;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
@@ -27,8 +28,16 @@ import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@@ -37,6 +46,8 @@ import static org.junit.Assert.assertEquals;
* Test for {@link HiveCatalog} created by {@link HiveCatalogFactory}.
*/
public class HiveCatalogFactoryTest extends TestLogger {
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
@Test
public void test() {
@@ -54,9 +65,53 @@ public class HiveCatalogFactoryTest extends TestLogger {
checkEquals(expectedCatalog, (HiveCatalog) actualCatalog);
}
+ @Test
+ public void testLoadHDFSConfigFromEnv() throws IOException {
+ final String k1 = "what is connector?";
+ final String v1 = "Hive";
+ final String catalogName = "HiveCatalog";
+
+ // set HADOOP_CONF_DIR env
+ final File hadoopConfDir = tempFolder.newFolder();
+ final File hdfsSiteFile = new File(hadoopConfDir, "hdfs-site.xml");
+ writeProperty(hdfsSiteFile, k1, v1);
+ final Map<String, String> originalEnv = System.getenv();
+ final Map<String, String> newEnv = new HashMap<>(originalEnv);
+ newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+ CommonTestUtils.setEnv(newEnv);
+
+ // create HiveCatalog use the Hadoop Configuration
+ final CatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor();
+ final Map<String, String> properties = catalogDescriptor.toProperties();
+ final HiveConf hiveConf;
+ try {
+ final HiveCatalog hiveCatalog = (HiveCatalog) TableFactoryService.find(CatalogFactory.class, properties)
+ .createCatalog(catalogName, properties);
+ hiveConf = hiveCatalog.getHiveConf();
+ } finally {
+ // set the Env back
+ CommonTestUtils.setEnv(originalEnv);
+ }
+ //validate the result
+ assertEquals(v1, hiveConf.get(k1, null));
+ }
+
private static void checkEquals(HiveCatalog c1, HiveCatalog c2) {
// Only assert a few selected properties for now
assertEquals(c1.getName(), c2.getName());
assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
}
+
+ private static void writeProperty(File file, String key, String value) throws IOException {
+ try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+ out.println("<configuration>");
+ out.println("\t<property>");
+ out.println("\t\t<name>" + key + "</name>");
+ out.println("\t\t<value>" + value + "</value>");
+ out.println("\t</property>");
+ out.println("</configuration>");
+ }
+ }
}