You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/02 03:30:08 UTC
[flink] branch release-1.11 updated: [FLINK-17968][hbase] Fix
Hadoop Configuration is not properly serialized in HBaseRowInputFormat
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 280df72 [FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in HBaseRowInputFormat
280df72 is described below
commit 280df7236a796d5b175f94952bea07b207ddf0d8
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Jun 2 11:26:32 2020 +0800
[FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in HBaseRowInputFormat
This closes #12146
---
.../flink/addons/hbase/TableInputFormat.java | 4 ++
.../hbase/source/AbstractTableInputFormat.java | 13 ++++
.../connector/hbase/source/HBaseInputFormat.java | 13 +++-
.../hbase/source/HBaseRowDataInputFormat.java | 12 +---
.../hbase/source/HBaseRowInputFormat.java | 11 +--
.../connector/hbase/HBaseConnectorITCase.java | 12 ++--
.../connector/hbase/example/HBaseReadExample.java | 3 +-
.../flink/connector/hbase/util/HBaseTestBase.java | 1 -
.../hbase/util/HBaseTestingClusterAutoStarter.java | 83 +---------------------
9 files changed, 40 insertions(+), 112 deletions(-)
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 9be258e..a5e044e 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -31,4 +31,8 @@ import org.apache.flink.connector.hbase.source.HBaseInputFormat;
public abstract class TableInputFormat<T extends Tuple> extends HBaseInputFormat<T> {
private static final long serialVersionUID = 1L;
+ public TableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+ super(hConf);
+ }
+
}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 684afd2..94d36d3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -24,8 +24,10 @@ import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -60,6 +62,13 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
protected byte[] currentRow;
protected long scannedRows;
+ // Configuration is not serializable
+ protected byte[] serializedConfig;
+
+ public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+ serializedConfig = HBaseConfigurationUtil.serializeConfiguration(hConf);
+ }
+
/**
* Returns an instance of Scan that retrieves the required subset of records from the HBase table.
*
@@ -99,6 +108,10 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
*/
public abstract void configure(Configuration parameters);
+ protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+ return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+ }
+
@Override
public void open(TableInputSplit split) throws IOException {
if (table == null) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
index 972cf0e..45d5b3f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@@ -37,6 +36,15 @@ public abstract class HBaseInputFormat<T extends Tuple> extends AbstractTableInp
private static final long serialVersionUID = 1L;
/**
+ * Constructs a {@link InputFormat} with hbase configuration to read data from hbase.
+ * @param hConf The configuration that connect to hbase.
+ * At least hbase.zookeeper.quorum and zookeeper.znode.parent need to be set.
+ */
+ public HBaseInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+ super(hConf);
+ }
+
+ /**
* Returns an instance of Scan that retrieves the required subset of records from the HBase table.
* @return The appropriate instance of Scan for this usecase.
*/
@@ -79,8 +87,7 @@ public abstract class HBaseInputFormat<T extends Tuple> extends AbstractTableInp
*/
private HTable createTable() {
LOG.info("Initializing HBaseConfiguration");
- //use files found in the classpath
- org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
+ org.apache.hadoop.conf.Configuration hConf = getHadoopConfiguration();
try {
return new HTable(hConf, getTableName());
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
index 5a628d9..30be6d8 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
@@ -24,7 +24,6 @@ import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.data.RowData;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
@@ -50,15 +49,13 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
private transient HBaseSerde serde;
- private transient org.apache.hadoop.conf.Configuration conf;
-
public HBaseRowDataInputFormat(
org.apache.hadoop.conf.Configuration conf,
String tableName,
HBaseTableSchema schema,
String nullStringLiteral) {
+ super(conf);
this.tableName = tableName;
- this.conf = conf;
this.schema = schema;
this.nullStringLiteral = nullStringLiteral;
}
@@ -89,13 +86,8 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
}
private void connectToTable() {
-
- if (this.conf == null) {
- this.conf = HBaseConfiguration.create();
- }
-
try {
- Connection conn = ConnectionFactory.createConnection(conf);
+ Connection conn = ConnectionFactory.createConnection(getHadoopConfiguration());
super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + tableName + " not found ", tnfe);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
index f455298..f7100ed 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
@@ -54,12 +53,11 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
private final String tableName;
private final HBaseTableSchema schema;
- private transient org.apache.hadoop.conf.Configuration conf;
private transient HBaseReadWriteHelper readHelper;
public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
+ super(conf);
this.tableName = tableName;
- this.conf = conf;
this.schema = schema;
}
@@ -90,13 +88,8 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
}
private void connectToTable() {
-
- if (this.conf == null) {
- this.conf = HBaseConfiguration.create();
- }
-
try {
- Connection conn = ConnectionFactory.createConnection(conf);
+ Connection conn = ConnectionFactory.createConnection(getHadoopConfiguration());
super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
} catch (TableNotFoundException tnfe) {
LOG.error("The table " + tableName + " not found ", tnfe);
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 2a1ffa2..7724873 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -54,8 +54,6 @@ import org.apache.flink.table.sources.TableSource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -312,7 +310,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple1<Integer>> result = env
- .createInput(new InputFormatForTestTable())
+ .createInput(new InputFormatForTestTable(getConf()))
.reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> Tuple1.of(v1.f0 + v2.f0));
List<Tuple1<Integer>> resultSet = result.collect();
@@ -612,9 +610,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
properties.put(CONNECTOR_PROPERTY_VERSION, "1");
properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
- // get zk quorum from "hbase-site.xml" in classpath
- String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
- properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+ properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
// schema
String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
@@ -705,6 +701,10 @@ public class HBaseConnectorITCase extends HBaseTestBase {
public static class InputFormatForTestTable extends HBaseInputFormat<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
+ public InputFormatForTestTable(org.apache.hadoop.conf.Configuration hConf) {
+ super(hConf);
+ }
+
@Override
protected Scan getScanner() {
return new Scan();
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
index 13914e5..026c6d0 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.hbase.source.HBaseInputFormat;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -47,7 +48,7 @@ public class HBaseReadExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@SuppressWarnings("serial")
- DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new HBaseInputFormat<Tuple2<String, String>>() {
+ DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new HBaseInputFormat<Tuple2<String, String>>(HBaseConfiguration.create()) {
@Override
public String getTableName() {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index 04555cc..e00b986 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -74,7 +74,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
@BeforeClass
public static void activateHBaseCluster() throws IOException {
- registerHBaseMiniClusterInClasspath();
prepareTables();
}
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
index 9e15a2f..4ba1c17 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -40,24 +40,13 @@ import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
-import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import java.util.Random;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
/**
* By using this class as the super class of a set of tests you will have a HBase testing
@@ -83,8 +72,6 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
private static HBaseAdmin admin = null;
private static List<TableName> createdTables = new ArrayList<>();
- private static boolean alreadyRegisteredTestCluster = false;
-
private static Configuration conf;
protected static void createTable(TableName tableName, byte[][] columnFamilyName, byte[][] splitKeys) {
@@ -157,6 +144,7 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1);
// Make sure the zookeeper quorum value contains the right port number (varies per run).
+ LOG.info("Hbase minicluster client port: " + TEST_UTIL.getZkCluster().getClientPort());
TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort());
conf = initialize(TEST_UTIL.getConfiguration());
@@ -170,77 +158,10 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
}
- private static File hbaseSiteXmlDirectory;
- private static File hbaseSiteXmlFile;
-
- /**
- * This dynamically generates a hbase-site.xml file that is added to the classpath.
- * This way this HBaseMinicluster can be used by an unmodified application.
- * The downside is that this cannot be 'unloaded' so you can have only one per JVM.
- */
- public static void registerHBaseMiniClusterInClasspath() {
- if (alreadyRegisteredTestCluster) {
- fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM");
- }
- File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/"));
- hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
- if (!hbaseSiteXmlDirectory.mkdirs()) {
- fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster");
- }
-
- assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster());
-
- createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
- addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
- // Avoid starting it again.
- alreadyRegisteredTestCluster = true;
- }
-
public static Configuration getConf() {
return conf;
}
- private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
- hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
- // Create the hbase-site.xml file for this run.
- try {
- String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
- "<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
- "<configuration>\n" +
- " <property>\n" +
- " <name>hbase.zookeeper.quorum</name>\n" +
- " <value>" + zookeeperQuorum + "</value>\n" +
- " </property>\n" +
- "</configuration>";
- OutputStream fos = new FileOutputStream(hbaseSiteXmlFile);
- fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
- fos.close();
- } catch (IOException e) {
- fail("Unable to create " + hbaseSiteXmlFile);
- }
- }
-
- private static void addDirectoryToClassPath(File directory) {
- try {
- // Get the classloader actually used by HBaseConfiguration
- ClassLoader classLoader = HBaseConfiguration.create().getClassLoader();
- if (!(classLoader instanceof URLClassLoader)) {
- fail("We should get a URLClassLoader");
- }
-
- // Make the addURL method accessible
- Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
- method.setAccessible(true);
-
- // Add the directory where we put the hbase-site.xml to the classpath
- method.invoke(classLoader, directory.toURI().toURL());
- } catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
- fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage());
- }
- }
-
@AfterClass
public static void tearDown() throws Exception {
if (conf == null) {
@@ -249,8 +170,6 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
}
LOG.info("HBase minicluster: Shutting down");
deleteTables();
- hbaseSiteXmlFile.delete();
- hbaseSiteXmlDirectory.delete();
TEST_UTIL.shutdownMiniCluster();
LOG.info("HBase minicluster: Down");
}