You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/03/27 03:05:49 UTC
[flink] 02/03: [FLINK-16647][hive] Hive connector should read
mapred-site.xml
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
commit fd9d68f2c6e32894f6ac846f899c466b3f07d972
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Mar 23 11:55:28 2020 +0800
[FLINK-16647][hive] Hive connector should read mapred-site.xml
---
.../flink/table/catalog/hive/HiveCatalog.java | 16 ++++++++++++--
.../hive/factories/HiveCatalogFactoryTest.java | 25 ++++++++++++++++------
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 83a5299..fd3bd20 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -70,6 +70,8 @@ import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
@@ -97,6 +99,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.File;
import java.net.MalformedURLException;
import java.nio.file.Paths;
import java.util.ArrayList;
@@ -170,8 +173,17 @@ public class HiveCatalog extends AbstractCatalog {
}
// create HiveConf from hadoop configuration
- return new HiveConf(HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()),
- HiveConf.class);
+ Configuration hadoopConf = HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration());
+
+ // Add mapred-site.xml. We need to read configurations like compression codec.
+ for (String possibleHadoopConfPath : HadoopUtils.possibleHadoopConfPaths(new org.apache.flink.configuration.Configuration())) {
+ File mapredSite = new File(new File(possibleHadoopConfPath), "mapred-site.xml");
+ if (mapredSite.exists()) {
+ hadoopConf.addResource(new Path(mapredSite.getAbsolutePath()));
+ break;
+ }
+ }
+ return new HiveConf(hadoopConf, HiveConf.class);
}
@VisibleForTesting
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index 1339a94..0db52b4 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -66,18 +66,29 @@ public class HiveCatalogFactoryTest extends TestLogger {
}
@Test
- public void testLoadHDFSConfigFromEnv() throws IOException {
- final String k1 = "what is connector?";
- final String v1 = "Hive";
+ public void testLoadHadoopConfigFromEnv() throws IOException {
+ Map<String, String> customProps = new HashMap<>();
+ String k1 = "what is connector?";
+ String v1 = "Hive";
final String catalogName = "HiveCatalog";
// set HADOOP_CONF_DIR env
final File hadoopConfDir = tempFolder.newFolder();
final File hdfsSiteFile = new File(hadoopConfDir, "hdfs-site.xml");
writeProperty(hdfsSiteFile, k1, v1);
+ customProps.put(k1, v1);
+
+ // add mapred-site file
+ final File mapredSiteFile = new File(hadoopConfDir, "mapred-site.xml");
+ k1 = "mapred.site.config.key";
+ v1 = "mapred.site.config.val";
+ writeProperty(mapredSiteFile, k1, v1);
+ customProps.put(k1, v1);
+
final Map<String, String> originalEnv = System.getenv();
final Map<String, String> newEnv = new HashMap<>(originalEnv);
newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+ newEnv.remove("HADOOP_HOME");
CommonTestUtils.setEnv(newEnv);
// create HiveCatalog use the Hadoop Configuration
@@ -86,14 +97,16 @@ public class HiveCatalogFactoryTest extends TestLogger {
final HiveConf hiveConf;
try {
final HiveCatalog hiveCatalog = (HiveCatalog) TableFactoryService.find(CatalogFactory.class, properties)
- .createCatalog(catalogName, properties);
+ .createCatalog(catalogName, properties);
hiveConf = hiveCatalog.getHiveConf();
} finally {
// set the Env back
CommonTestUtils.setEnv(originalEnv);
}
- //validate the result
- assertEquals(v1, hiveConf.get(k1, null));
+ // validate the result
+ for (String key : customProps.keySet()) {
+ assertEquals(customProps.get(key), hiveConf.get(key, null));
+ }
}
private static void checkEquals(HiveCatalog c1, HiveCatalog c2) {