You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/04 17:37:06 UTC
[flink] branch master updated: [FLINK-17384][connectors/hbase]
Support reading hbase conf dir from flink-conf.yaml
This is an automated email from the ASF dual-hosted git repository.
liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0799b5c [FLINK-17384][connectors/hbase] Support reading hbase conf dir from flink-conf.yaml
0799b5c is described below
commit 0799b5c20a127110e47439668cf8f8db2e4ecbf3
Author: liuyongvs <li...@gmail.com>
AuthorDate: Thu May 14 15:48:50 2020 +0800
[FLINK-17384][connectors/hbase] Support reading hbase conf dir from flink-conf.yaml
This closes #12144.
---
.../generated/environment_configuration.html | 6 +
.../connector/hbase/HBaseDynamicTableFactory.java | 4 +-
.../flink/connector/hbase/HBaseTableFactory.java | 4 +-
.../hbase/sink/HBaseDynamicTableSink.java | 4 +-
.../connector/hbase/sink/HBaseSinkFunction.java | 3 +-
.../connector/hbase/sink/HBaseUpsertTableSink.java | 4 +-
.../hbase/source/AbstractTableInputFormat.java | 3 +-
.../hbase/source/HBaseLookupFunction.java | 3 +-
.../hbase/util/HBaseConfigurationUtil.java | 66 +++++++-
.../hbase/util/HBaseConfigLoadingTest.java | 176 +++++++++++++++++++++
.../src/test/resources/hbase-site.xml | 29 ++++
.../apache/flink/configuration/CoreOptions.java | 11 ++
flink-dist/src/main/flink-bin/bin/config.sh | 24 +++
13 files changed, 322 insertions(+), 15 deletions(-)
diff --git a/docs/_includes/generated/environment_configuration.html b/docs/_includes/generated/environment_configuration.html
index 912ecbb..0fffb88 100644
--- a/docs/_includes/generated/environment_configuration.html
+++ b/docs/_includes/generated/environment_configuration.html
@@ -15,6 +15,12 @@
<td>Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.</td>
</tr>
<tr>
+ <td><h5>env.hbase.conf.dir</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Path to hbase configuration directory. It is required to read HBASE configuration. You can also set it via environment variable.</td>
+ </tr>
+ <tr>
<td><h5>env.java.opts</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index b0e6dee..48b70eb 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -34,7 +35,6 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import java.time.Duration;
@@ -108,7 +108,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
String hTableName = helper.getOptions().get(TABLE_NAME);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
- Configuration hbaseClientConf = HBaseConfiguration.create();
+ Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
index f64f9b9..ca3e1e5 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.sink.HBaseUpsertTableSink;
import org.apache.flink.connector.hbase.source.HBaseTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -38,7 +39,6 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import java.sql.Date;
@@ -83,7 +83,7 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamT
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
- Configuration hbaseClientConf = HBaseConfiguration.create();
+ Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
descriptorProperties
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
index 193a09a..5d60ae3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -30,7 +31,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
/**
@@ -57,7 +57,7 @@ public class HBaseDynamicTableSink implements DynamicTableSink {
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- Configuration hbaseClientConf = HBaseConfiguration.create();
+ Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
HBaseSinkFunction<RowData> sinkFunction = new HBaseSinkFunction<>(
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 3d96e5d..e9d16bc 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.StringUtils;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -152,7 +151,7 @@ public class HBaseSinkFunction<T>
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
// user params from client-side have the highest priority
- org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+ org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
index 0f8b8eb..f882833 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.hbase.options.HBaseOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -35,7 +36,6 @@ import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import java.util.Arrays;
@@ -89,7 +89,7 @@ public class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
- Configuration hbaseClientConf = HBaseConfiguration.create();
+ Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
HBaseSinkFunction sinkFunction = new HBaseSinkFunction(
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 94d36d3..feeff3d 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -109,7 +108,7 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
public abstract void configure(Configuration parameters);
protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
- return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+ return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
}
@Override
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..ed2807d 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
@@ -93,7 +92,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
// user params from client-side have the highest priority
org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
serializedConfig,
- HBaseConfiguration.create());
+ HBaseConfigurationUtil.getHBaseConfiguration());
// do validation: check key option(s) in final runtime configuration
if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
index a7b11e5..165f95a 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
@@ -22,20 +22,84 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
/**
- * This class helps to do serialization for hadoop Configuration.
+ * This class helps to do serialization for hadoop Configuration and HBase-related classes.
*/
@Internal
public class HBaseConfigurationUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigurationUtil.class);
+
+ public static Configuration getHBaseConfiguration() {
+
+ // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from the classpath.
+ Configuration result = HBaseConfiguration.create();
+ boolean foundHBaseConfiguration = false;
+
+ // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
+ // The properties of a newly added resource will override the ones in previous resources, so a configuration
+ // file with higher priority should be added later.
+
+ // Approach 1: HBASE_HOME environment variables
+ String possibleHBaseConfPath = null;
+
+ final String hbaseHome = System.getenv("HBASE_HOME");
+ if (hbaseHome != null) {
+ LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome);
+ possibleHBaseConfPath = hbaseHome + "/conf";
+ }
+
+ if (possibleHBaseConfPath != null) {
+ foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath);
+ }
+
+ // Approach 2: HBASE_CONF_DIR environment variable
+ String hbaseConfDir = System.getenv("HBASE_CONF_DIR");
+ if (hbaseConfDir != null) {
+ LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir);
+ foundHBaseConfiguration = addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration;
+ }
+
+ if (!foundHBaseConfiguration) {
+ LOG.warn("Could not find HBase configuration via any of the supported methods " +
+ "(Flink configuration, environment variables).");
+ }
+
+ return result;
+ }
+
+ /**
+ * Search HBase configuration files in the given path, and add them to the configuration if found.
+ */
+ private static boolean addHBaseConfIfFound(Configuration configuration, String possibleHBaseConfPath) {
+ boolean foundHBaseConfiguration = false;
+ if (new File(possibleHBaseConfPath).exists()) {
+ if (new File(possibleHBaseConfPath + "/hbase-default.xml").exists()) {
+ configuration.addResource(new org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-default.xml"));
+ LOG.debug("Adding " + possibleHBaseConfPath + "/hbase-default.xml to hbase configuration");
+ foundHBaseConfiguration = true;
+ }
+ if (new File(possibleHBaseConfPath + "/hbase-site.xml").exists()) {
+ configuration.addResource(new org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-site.xml"));
+ LOG.debug("Adding " + possibleHBaseConfPath + "/hbase-site.xml to hbase configuration");
+ foundHBaseConfiguration = true;
+ }
+ }
+ return foundHBaseConfiguration;
+ }
+
/**
* Serialize a Hadoop {@link Configuration} into byte[].
*/
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
new file mode 100644
index 0000000..89aa23c
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate the loading of the HBase configuration, relative to
+ * entries in the Flink configuration and the environment variables.
+ */
+public class HBaseConfigLoadingTest {
+
+ private static final String IN_HBASE_CONFIG_KEY = "hbase_conf_key";
+ private static final String IN_HBASE_CONFIG_VALUE = "hbase_conf_value!";
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void loadFromClasspathByDefault() {
+ org.apache.hadoop.conf.Configuration hbaseConf =
+ HBaseConfigurationUtil.getHBaseConfiguration();
+
+ assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+ }
+
+ @Test
+ public void loadFromEnvVariables() throws Exception {
+ final String k1 = "where?";
+ final String v1 = "I'm on a boat";
+ final String k2 = "when?";
+ final String v2 = "midnight";
+ final String k3 = "why?";
+ final String v3 = "what do you think?";
+ final String k4 = "which way?";
+ final String v4 = "south, always south...";
+
+ final File hbaseConfDir = tempFolder.newFolder();
+
+ final File hbaseHome = tempFolder.newFolder();
+
+ final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+ assertTrue(hbaseHomeConf.mkdirs());
+
+ final File file1 = new File(hbaseConfDir, "hbase-default.xml");
+ final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+ final File file3 = new File(hbaseHomeConf, "hbase-default.xml");
+ final File file4 = new File(hbaseHomeConf, "hbase-site.xml");
+
+ printConfig(file1, k1, v1);
+ printConfig(file2, k2, v2);
+ printConfig(file3, k3, v3);
+ printConfig(file4, k4, v4);
+
+ final org.apache.hadoop.conf.Configuration hbaseConf;
+
+ final Map<String, String> originalEnv = System.getenv();
+ final Map<String, String> newEnv = new HashMap<>(originalEnv);
+ newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+ newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+ try {
+ CommonTestUtils.setEnv(newEnv);
+ hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
+ }
+ finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ // contains extra entries
+ assertEquals(v1, hbaseConf.get(k1, null));
+ assertEquals(v2, hbaseConf.get(k2, null));
+ assertEquals(v3, hbaseConf.get(k3, null));
+ assertEquals(v4, hbaseConf.get(k4, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+ }
+
+ @Test
+ public void loadOverlappingConfig() throws Exception {
+ final String k1 = "key1";
+
+ final String v1 = "from HBASE_HOME/conf";
+ final String v2 = "from HBASE_CONF_DIR";
+
+ final File hbaseHome = tempFolder.newFolder("hbaseHome");
+ final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+ final File hbaseConfDir = tempFolder.newFolder("hbaseConfDir");
+
+ assertTrue(hbaseHomeConf.mkdirs());
+ final File file1 = new File(hbaseHomeConf, "hbase-site.xml");
+
+ Map<String, String> properties1 = new HashMap<>();
+ properties1.put(k1, v1);
+ printConfigs(file1, properties1);
+
+ // HBASE_CONF_DIR conf will override k1 with v2
+ final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+ Map<String, String> properties2 = new HashMap<>();
+ properties2.put(k1, v2);
+ printConfigs(file2, properties2);
+
+ final org.apache.hadoop.conf.Configuration hbaseConf;
+
+ final Map<String, String> originalEnv = System.getenv();
+ final Map<String, String> newEnv = new HashMap<>(originalEnv);
+ newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+ newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+ try {
+ CommonTestUtils.setEnv(newEnv);
+ hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
+ }
+ finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ // contains extra entries
+ assertEquals(v2, hbaseConf.get(k1, null));
+
+ // also contains classpath defaults
+ assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+ }
+
+ private static void printConfig(File file, String key, String value) throws IOException {
+ Map<String, String> map = new HashMap<>(1);
+ map.put(key, value);
+ printConfigs(file, map);
+ }
+
+ private static void printConfigs(File file, Map<String, String> properties) throws IOException {
+ try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+ out.println("<configuration>");
+ for (Map.Entry<String, String> entry: properties.entrySet()) {
+ out.println("\t<property>");
+ out.println("\t\t<name>" + entry.getKey() + "</name>");
+ out.println("\t\t<value>" + entry.getValue() + "</value>");
+ out.println("\t</property>");
+ }
+ out.println("</configuration>");
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..1e58ef4
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- Values used when running unit tests. Specify any values in here that
+ should override the default values. -->
+
+<configuration>
+ <property>
+ <name>hbase_conf_key</name>
+ <value>hbase_conf_value!</value>
+ </property>
+</configuration>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 7d19352..eb78445 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -243,6 +243,17 @@ public class CoreOptions {
.withDescription("Path to yarn configuration directory. It is required to run flink on YARN. You can also" +
" set it via environment variable.");
+ /**
+ * This options is here only for documentation generation, it is only
+ * evaluated in the shell scripts.
+ */
+ @SuppressWarnings("unused")
+ public static final ConfigOption<String> FLINK_HBASE_CONF_DIR = ConfigOptions
+ .key("env.hbase.conf.dir")
+ .noDefaultValue()
+ .withDescription("Path to hbase configuration directory. It is required to read HBASE configuration." +
+ " You can also set it via environment variable.");
+
// ------------------------------------------------------------------------
// generic io
// ------------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 01d6308..39095b5 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -114,6 +114,7 @@ DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)
DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary
########################################################################################################################
# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
@@ -126,6 +127,7 @@ KEY_ENV_LOG_DIR="env.log.dir"
KEY_ENV_LOG_MAX="env.log.max"
KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
KEY_ENV_JAVA_HOME="env.java.home"
KEY_ENV_JAVA_OPTS="env.java.opts"
KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
@@ -253,6 +255,10 @@ if [ -z "${HADOOP_CONF_DIR}" ]; then
HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
fi
+if [ -z "${HBASE_CONF_DIR}" ]; then
+ HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
if [ -z "${FLINK_PID_DIR}" ]; then
FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
fi
@@ -344,6 +350,24 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
fi
fi
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -n "$HBASE_HOME" ]; then
+ # HBASE_HOME is set.
+ if [ -d "$HBASE_HOME/conf" ]; then
+ HBASE_CONF_DIR="$HBASE_HOME/conf"
+ fi
+ fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -d "/etc/hbase/conf" ]; then
+ echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+ HBASE_CONF_DIR="/etc/hbase/conf"
+ fi
+fi
+
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
if [ -n "${HBASE_CONF_DIR}" ]; then