You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2020/06/04 17:37:06 UTC

[flink] branch master updated: [FLINK-17384][connectors/hbase] Support reading hbase conf dir from flink-conf.yaml

This is an automated email from the ASF dual-hosted git repository.

liyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0799b5c  [FLINK-17384][connectors/hbase] Support reading hbase conf dir from flink-conf.yaml
0799b5c is described below

commit 0799b5c20a127110e47439668cf8f8db2e4ecbf3
Author: liuyongvs <li...@gmail.com>
AuthorDate: Thu May 14 15:48:50 2020 +0800

    [FLINK-17384][connectors/hbase] Support reading hbase conf dir from flink-conf.yaml
    
    This closes #12144.
---
 .../generated/environment_configuration.html       |   6 +
 .../connector/hbase/HBaseDynamicTableFactory.java  |   4 +-
 .../flink/connector/hbase/HBaseTableFactory.java   |   4 +-
 .../hbase/sink/HBaseDynamicTableSink.java          |   4 +-
 .../connector/hbase/sink/HBaseSinkFunction.java    |   3 +-
 .../connector/hbase/sink/HBaseUpsertTableSink.java |   4 +-
 .../hbase/source/AbstractTableInputFormat.java     |   3 +-
 .../hbase/source/HBaseLookupFunction.java          |   3 +-
 .../hbase/util/HBaseConfigurationUtil.java         |  66 +++++++-
 .../hbase/util/HBaseConfigLoadingTest.java         | 176 +++++++++++++++++++++
 .../src/test/resources/hbase-site.xml              |  29 ++++
 .../apache/flink/configuration/CoreOptions.java    |  11 ++
 flink-dist/src/main/flink-bin/bin/config.sh        |  24 +++
 13 files changed, 322 insertions(+), 15 deletions(-)

diff --git a/docs/_includes/generated/environment_configuration.html b/docs/_includes/generated/environment_configuration.html
index 912ecbb..0fffb88 100644
--- a/docs/_includes/generated/environment_configuration.html
+++ b/docs/_includes/generated/environment_configuration.html
@@ -15,6 +15,12 @@
             <td>Path to hadoop configuration directory. It is required to read HDFS and/or YARN configuration. You can also set it via environment variable.</td>
         </tr>
         <tr>
+            <td><h5>env.hbase.conf.dir</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Path to hbase configuration directory. It is required to read HBASE configuration. You can also set it via environment variable.</td>
+        </tr>
+        <tr>
             <td><h5>env.java.opts</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index b0e6dee..48b70eb 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase.source.HBaseDynamicTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -34,7 +35,6 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.time.Duration;
@@ -108,7 +108,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 
 		String hTableName = helper.getOptions().get(TABLE_NAME);
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
-		Configuration hbaseClientConf = HBaseConfiguration.create();
+		Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
 		hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
index f64f9b9..ca3e1e5 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.sink.HBaseUpsertTableSink;
 import org.apache.flink.connector.hbase.source.HBaseTableSource;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -38,7 +39,6 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.sql.Date;
@@ -83,7 +83,7 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamT
 	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
 		final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
-		Configuration hbaseClientConf = HBaseConfiguration.create();
+		Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
 		String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
 		descriptorProperties
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
index 193a09a..5d60ae3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseDynamicTableSink.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -30,7 +31,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 /**
@@ -57,7 +57,7 @@ public class HBaseDynamicTableSink implements DynamicTableSink {
 
 	@Override
 	public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-		Configuration hbaseClientConf = HBaseConfiguration.create();
+		Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
 		hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
 		HBaseSinkFunction<RowData> sinkFunction = new HBaseSinkFunction<>(
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
index 3d96e5d..e9d16bc 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseSinkFunction.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -152,7 +151,7 @@ public class HBaseSinkFunction<T>
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
 		// and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
 		// user params from client-side have the highest priority
-		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
 
 		// do validation: check key option(s) in final runtime configuration
 		if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
index 0f8b8eb..f882833 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/sink/HBaseUpsertTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.hbase.options.HBaseOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -35,7 +36,6 @@ import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 
 import java.util.Arrays;
@@ -89,7 +89,7 @@ public class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
 
 	@Override
 	public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
-		Configuration hbaseClientConf = HBaseConfiguration.create();
+		Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration();
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
 		hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
 		HBaseSinkFunction sinkFunction = new HBaseSinkFunction(
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 94d36d3..feeff3d 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.core.io.InputSplitAssigner;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -109,7 +108,7 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
 	public abstract void configure(Configuration parameters);
 
 	protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
-		return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+		return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
index 1d608e9..ed2807d 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseLookupFunction.java
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -93,7 +92,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
 		// user params from client-side have the highest priority
 		org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(
 			serializedConfig,
-			HBaseConfiguration.create());
+			HBaseConfigurationUtil.getHBaseConfiguration());
 
 		// do validation: check key option(s) in final runtime configuration
 		if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
index a7b11e5..165f95a 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseConfigurationUtil.java
@@ -22,20 +22,84 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.io.Writable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 
 /**
- * This class helps to do serialization for hadoop Configuration.
+ * This class helps to do serialization for hadoop Configuration and HBase-related classes.
  */
 @Internal
 public class HBaseConfigurationUtil {
 
+	private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigurationUtil.class);
+
+	public static Configuration getHBaseConfiguration() {
+
+		// Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from the classpath.
+		Configuration result = HBaseConfiguration.create();
+		boolean foundHBaseConfiguration = false;
+
+		// We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
+		// The properties of a newly added resource will override the ones in previous resources, so a configuration
+		// file with higher priority should be added later.
+
+		// Approach 1: HBASE_HOME environment variables
+		String possibleHBaseConfPath = null;
+
+		final String hbaseHome = System.getenv("HBASE_HOME");
+		if (hbaseHome != null) {
+			LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome);
+			possibleHBaseConfPath = hbaseHome + "/conf";
+		}
+
+		if (possibleHBaseConfPath != null) {
+			foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath);
+		}
+
+		// Approach 2: HBASE_CONF_DIR environment variable
+		String hbaseConfDir = System.getenv("HBASE_CONF_DIR");
+		if (hbaseConfDir != null) {
+			LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir);
+			foundHBaseConfiguration = addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration;
+		}
+
+		if (!foundHBaseConfiguration) {
+			LOG.warn("Could not find HBase configuration via any of the supported methods " +
+				"(Flink configuration, environment variables).");
+		}
+
+		return result;
+	}
+
+	/**
+	 * Search HBase configuration files in the given path, and add them to the configuration if found.
+	 */
+	private static boolean addHBaseConfIfFound(Configuration configuration, String possibleHBaseConfPath) {
+		boolean foundHBaseConfiguration = false;
+		if (new File(possibleHBaseConfPath).exists()) {
+			if (new File(possibleHBaseConfPath + "/hbase-default.xml").exists()) {
+				configuration.addResource(new org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-default.xml"));
+				LOG.debug("Adding " + possibleHBaseConfPath + "/hbase-default.xml to hbase configuration");
+				foundHBaseConfiguration = true;
+			}
+			if (new File(possibleHBaseConfPath + "/hbase-site.xml").exists()) {
+				configuration.addResource(new org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-site.xml"));
+				LOG.debug("Adding " + possibleHBaseConfPath + "/hbase-site.xml to hbase configuration");
+				foundHBaseConfiguration = true;
+			}
+		}
+		return foundHBaseConfiguration;
+	}
+
 	/**
 	 * Serialize a Hadoop {@link Configuration} into byte[].
 	 */
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
new file mode 100644
index 0000000..89aa23c
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseConfigLoadingTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that validate the loading of the HBase configuration, relative to
+ * entries in the Flink configuration and the environment variables.
+ */
+public class HBaseConfigLoadingTest {
+
+	private static final String IN_HBASE_CONFIG_KEY = "hbase_conf_key";
+	private static final String IN_HBASE_CONFIG_VALUE = "hbase_conf_value!";
+
+	@Rule
+	public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void loadFromClasspathByDefault() {
+		org.apache.hadoop.conf.Configuration hbaseConf =
+			HBaseConfigurationUtil.getHBaseConfiguration();
+
+		assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadFromEnvVariables() throws Exception {
+		final String k1 = "where?";
+		final String v1 = "I'm on a boat";
+		final String k2 = "when?";
+		final String v2 = "midnight";
+		final String k3 = "why?";
+		final String v3 = "what do you think?";
+		final String k4 = "which way?";
+		final String v4 = "south, always south...";
+
+		final File hbaseConfDir = tempFolder.newFolder();
+
+		final File hbaseHome = tempFolder.newFolder();
+
+		final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+		assertTrue(hbaseHomeConf.mkdirs());
+
+		final File file1 = new File(hbaseConfDir, "hbase-default.xml");
+		final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+		final File file3 = new File(hbaseHomeConf, "hbase-default.xml");
+		final File file4 = new File(hbaseHomeConf, "hbase-site.xml");
+
+		printConfig(file1, k1, v1);
+		printConfig(file2, k2, v2);
+		printConfig(file3, k3, v3);
+		printConfig(file4, k4, v4);
+
+		final org.apache.hadoop.conf.Configuration hbaseConf;
+
+		final Map<String, String> originalEnv = System.getenv();
+		final Map<String, String> newEnv = new HashMap<>(originalEnv);
+		newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+		newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+		try {
+			CommonTestUtils.setEnv(newEnv);
+			hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
+		}
+		finally {
+			CommonTestUtils.setEnv(originalEnv);
+		}
+
+		// contains extra entries
+		assertEquals(v1, hbaseConf.get(k1, null));
+		assertEquals(v2, hbaseConf.get(k2, null));
+		assertEquals(v3, hbaseConf.get(k3, null));
+		assertEquals(v4, hbaseConf.get(k4, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+	}
+
+	@Test
+	public void loadOverlappingConfig() throws Exception {
+		final String k1 = "key1";
+
+		final String v1 = "from HBASE_HOME/conf";
+		final String v2 = "from HBASE_CONF_DIR";
+
+		final File hbaseHome = tempFolder.newFolder("hbaseHome");
+		final File hbaseHomeConf = new File(hbaseHome, "conf");
+
+		final File hbaseConfDir = tempFolder.newFolder("hbaseConfDir");
+
+		assertTrue(hbaseHomeConf.mkdirs());
+		final File file1 = new File(hbaseHomeConf, "hbase-site.xml");
+
+		Map<String, String> properties1 = new HashMap<>();
+		properties1.put(k1, v1);
+		printConfigs(file1, properties1);
+
+		// HBASE_CONF_DIR conf will override k1 with v2
+		final File file2 = new File(hbaseConfDir, "hbase-site.xml");
+		Map<String, String> properties2 = new HashMap<>();
+		properties2.put(k1, v2);
+		printConfigs(file2, properties2);
+
+		final org.apache.hadoop.conf.Configuration hbaseConf;
+
+		final Map<String, String> originalEnv = System.getenv();
+		final Map<String, String> newEnv = new HashMap<>(originalEnv);
+		newEnv.put("HBASE_CONF_DIR", hbaseConfDir.getAbsolutePath());
+		newEnv.put("HBASE_HOME", hbaseHome.getAbsolutePath());
+		try {
+			CommonTestUtils.setEnv(newEnv);
+			hbaseConf = HBaseConfigurationUtil.getHBaseConfiguration();
+		}
+		finally {
+			CommonTestUtils.setEnv(originalEnv);
+		}
+
+		// contains extra entries
+		assertEquals(v2, hbaseConf.get(k1, null));
+
+		// also contains classpath defaults
+		assertEquals(IN_HBASE_CONFIG_VALUE, hbaseConf.get(IN_HBASE_CONFIG_KEY, null));
+	}
+
+	private static void printConfig(File file, String key, String value) throws IOException {
+		Map<String, String> map = new HashMap<>(1);
+		map.put(key, value);
+		printConfigs(file, map);
+	}
+
+	private static void printConfigs(File file, Map<String, String> properties) throws IOException {
+		try (PrintStream out = new PrintStream(new FileOutputStream(file))) {
+			out.println("<?xml version=\"1.0\"?>");
+			out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>");
+			out.println("<configuration>");
+			for (Map.Entry<String, String> entry: properties.entrySet()) {
+				out.println("\t<property>");
+				out.println("\t\t<name>" + entry.getKey() + "</name>");
+				out.println("\t\t<value>" + entry.getValue() + "</value>");
+				out.println("\t</property>");
+			}
+			out.println("</configuration>");
+		}
+	}
+}
diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
new file mode 100644
index 0000000..1e58ef4
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/hbase-site.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Values used when running unit tests.  Specify any values in here that
+     should override the default values. -->
+
+<configuration>
+    <property>
+        <name>hbase_conf_key</name>
+        <value>hbase_conf_value!</value>
+    </property>
+</configuration>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 7d19352..eb78445 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -243,6 +243,17 @@ public class CoreOptions {
 		.withDescription("Path to yarn configuration directory. It is required to run flink on YARN. You can also" +
 			" set it via environment variable.");
 
+	/**
+	 * This options is here only for documentation generation, it is only
+	 * evaluated in the shell scripts.
+	 */
+	@SuppressWarnings("unused")
+	public static final ConfigOption<String> FLINK_HBASE_CONF_DIR = ConfigOptions
+		.key("env.hbase.conf.dir")
+		.noDefaultValue()
+		.withDescription("Path to hbase configuration directory. It is required to read HBASE configuration." +
+			" You can also set it via environment variable.");
+
 	// ------------------------------------------------------------------------
 	//  generic io
 	// ------------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 01d6308..39095b5 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -114,6 +114,7 @@ DEFAULT_ENV_JAVA_OPTS_CLI=""                        # Optional JVM args (Client)
 DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster mode
 DEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessary
 DEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR=""                           # HBase Configuration Directory, if necessary
 
 ########################################################################################################################
 # CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
@@ -126,6 +127,7 @@ KEY_ENV_LOG_DIR="env.log.dir"
 KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
 KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
 KEY_ENV_JAVA_HOME="env.java.home"
 KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
@@ -253,6 +255,10 @@ if [ -z "${HADOOP_CONF_DIR}" ]; then
     HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
 fi
 
+if [ -z "${HBASE_CONF_DIR}" ]; then
+    HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
 if [ -z "${FLINK_PID_DIR}" ]; then
     FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
 fi
@@ -344,6 +350,24 @@ if [ -z "$HADOOP_CONF_DIR" ]; then
     fi
 fi
 
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+    if [ -n "$HBASE_HOME" ]; then
+        # HBASE_HOME is set.
+        if [ -d "$HBASE_HOME/conf" ]; then
+            HBASE_CONF_DIR="$HBASE_HOME/conf"
+        fi
+    fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+    if [ -d "/etc/hbase/conf" ]; then
+        echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+        HBASE_CONF_DIR="/etc/hbase/conf"
+    fi
+fi
+
 INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
 
 if [ -n "${HBASE_CONF_DIR}" ]; then