You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/02 03:27:02 UTC

[flink] branch master updated: [FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in HBaseRowInputFormat

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 03b82f9  [FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in HBaseRowInputFormat
03b82f9 is described below

commit 03b82f9aa81b828b06c61f7f30f92a331ec5120b
Author: zhangmang <zh...@163.com>
AuthorDate: Tue Jun 2 11:26:32 2020 +0800

    [FLINK-17968][hbase] Fix Hadoop Configuration is not properly serialized in HBaseRowInputFormat
    
    This closes #12146
---
 .../flink/addons/hbase/TableInputFormat.java       |  4 ++
 .../hbase/source/AbstractTableInputFormat.java     | 13 ++++
 .../connector/hbase/source/HBaseInputFormat.java   | 13 +++-
 .../hbase/source/HBaseRowDataInputFormat.java      | 12 +---
 .../hbase/source/HBaseRowInputFormat.java          | 11 +--
 .../connector/hbase/HBaseConnectorITCase.java      | 12 ++--
 .../connector/hbase/example/HBaseReadExample.java  |  3 +-
 .../flink/connector/hbase/util/HBaseTestBase.java  |  1 -
 .../hbase/util/HBaseTestingClusterAutoStarter.java | 83 +---------------------
 9 files changed, 40 insertions(+), 112 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
index 9be258e..a5e044e 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputFormat.java
@@ -31,4 +31,8 @@ import org.apache.flink.connector.hbase.source.HBaseInputFormat;
 public abstract class TableInputFormat<T extends Tuple> extends HBaseInputFormat<T> {
 	private static final long serialVersionUID = 1L;
 
+	public TableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+		super(hConf);
+	}
+
 }
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
index 684afd2..94d36d3 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/AbstractTableInputFormat.java
@@ -24,8 +24,10 @@ import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.core.io.InputSplitAssigner;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -60,6 +62,13 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
 	protected byte[] currentRow;
 	protected long scannedRows;
 
+	// Configuration is not serializable
+	protected byte[] serializedConfig;
+
+	public AbstractTableInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+		serializedConfig = HBaseConfigurationUtil.serializeConfiguration(hConf);
+	}
+
 	/**
 	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
 	 *
@@ -99,6 +108,10 @@ abstract class AbstractTableInputFormat<T> extends RichInputFormat<T, TableInput
 	 */
 	public abstract void configure(Configuration parameters);
 
+	protected org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+		return HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+	}
+
 	@Override
 	public void open(TableInputSplit split) throws IOException {
 		if (table == null) {
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
index 972cf0e..45d5b3f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseInputFormat.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -37,6 +36,15 @@ public abstract class HBaseInputFormat<T extends Tuple> extends AbstractTableInp
 	private static final long serialVersionUID = 1L;
 
 	/**
+	 * Constructs a {@link InputFormat} with hbase configuration to read data from hbase.
+	 * @param hConf The configuration that connect to hbase.
+	 *              At least hbase.zookeeper.quorum and zookeeper.znode.parent need to be set.
+	 */
+	public HBaseInputFormat(org.apache.hadoop.conf.Configuration hConf) {
+		super(hConf);
+	}
+
+	/**
 	 * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
 	 * @return The appropriate instance of Scan for this usecase.
 	 */
@@ -79,8 +87,7 @@ public abstract class HBaseInputFormat<T extends Tuple> extends AbstractTableInp
 	 */
 	private HTable createTable() {
 		LOG.info("Initializing HBaseConfiguration");
-		//use files found in the classpath
-		org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();
+		org.apache.hadoop.conf.Configuration hConf = getHadoopConfiguration();
 
 		try {
 			return new HTable(hConf, getTableName());
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
index 5a628d9..30be6d8 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataInputFormat.java
@@ -24,7 +24,6 @@ import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.data.RowData;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
@@ -50,15 +49,13 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
 
 	private transient HBaseSerde serde;
 
-	private transient org.apache.hadoop.conf.Configuration conf;
-
 	public HBaseRowDataInputFormat(
 			org.apache.hadoop.conf.Configuration conf,
 			String tableName,
 			HBaseTableSchema schema,
 			String nullStringLiteral) {
+		super(conf);
 		this.tableName = tableName;
-		this.conf = conf;
 		this.schema = schema;
 		this.nullStringLiteral = nullStringLiteral;
 	}
@@ -89,13 +86,8 @@ public class HBaseRowDataInputFormat extends AbstractTableInputFormat<RowData> {
 	}
 
 	private void connectToTable() {
-
-		if (this.conf == null) {
-			this.conf = HBaseConfiguration.create();
-		}
-
 		try {
-			Connection conn = ConnectionFactory.createConnection(conf);
+			Connection conn = ConnectionFactory.createConnection(getHadoopConfiguration());
 			super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
 		} catch (TableNotFoundException tnfe) {
 			LOG.error("The table " + tableName + " not found ", tnfe);
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
index f455298..f7100ed 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.flink.connector.hbase.util.HBaseReadWriteHelper;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.Connection;
@@ -54,12 +53,11 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	private final String tableName;
 	private final HBaseTableSchema schema;
 
-	private transient org.apache.hadoop.conf.Configuration conf;
 	private transient HBaseReadWriteHelper readHelper;
 
 	public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
+		super(conf);
 		this.tableName = tableName;
-		this.conf = conf;
 		this.schema = schema;
 	}
 
@@ -90,13 +88,8 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
 	}
 
 	private void connectToTable() {
-
-		if (this.conf == null) {
-			this.conf = HBaseConfiguration.create();
-		}
-
 		try {
-			Connection conn = ConnectionFactory.createConnection(conf);
+			Connection conn = ConnectionFactory.createConnection(getHadoopConfiguration());
 			super.table = (HTable) conn.getTable(TableName.valueOf(tableName));
 		} catch (TableNotFoundException tnfe) {
 			LOG.error("The table " + tableName + " not found ", tnfe);
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 2a1ffa2..7724873 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -54,8 +54,6 @@ import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
 
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -312,7 +310,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Tuple1<Integer>> result = env
-			.createInput(new InputFormatForTestTable())
+			.createInput(new InputFormatForTestTable(getConf()))
 			.reduce((ReduceFunction<Tuple1<Integer>>) (v1, v2) -> Tuple1.of(v1.f0 + v2.f0));
 
 		List<Tuple1<Integer>> resultSet = result.collect();
@@ -612,9 +610,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 		properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
 		properties.put(CONNECTOR_PROPERTY_VERSION, "1");
 		properties.put(CONNECTOR_TABLE_NAME, TEST_TABLE_1);
-		// get zk quorum from "hbase-site.xml" in classpath
-		String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
-		properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
+		properties.put(CONNECTOR_ZK_QUORUM, getZookeeperQuorum());
 		// schema
 		String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
 		TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
@@ -705,6 +701,10 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 	public static class InputFormatForTestTable extends HBaseInputFormat<Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
+		public InputFormatForTestTable(org.apache.hadoop.conf.Configuration hConf) {
+			super(hConf);
+		}
+
 		@Override
 		protected Scan getScanner() {
 			return new Scan();
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
index 13914e5..026c6d0 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/example/HBaseReadExample.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.hbase.source.HBaseInputFormat;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -47,7 +48,7 @@ public class HBaseReadExample {
 	public static void main(String[] args) throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		@SuppressWarnings("serial")
-		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new HBaseInputFormat<Tuple2<String, String>>() {
+		DataSet<Tuple2<String, String>> hbaseDs = env.createInput(new HBaseInputFormat<Tuple2<String, String>>(HBaseConfiguration.create()) {
 
 				@Override
 				public String getTableName() {
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
index 04555cc..e00b986 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestBase.java
@@ -74,7 +74,6 @@ public abstract class HBaseTestBase extends HBaseTestingClusterAutoStarter {
 
 	@BeforeClass
 	public static void activateHBaseCluster() throws IOException {
-		registerHBaseMiniClusterInClasspath();
 		prepareTables();
 	}
 
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
index 9e15a2f..4ba1c17 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/util/HBaseTestingClusterAutoStarter.java
@@ -40,24 +40,13 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.BeforeClass;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Random;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * By using this class as the super class of a set of tests you will have a HBase testing
@@ -83,8 +72,6 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 	private static HBaseAdmin admin = null;
 	private static List<TableName> createdTables = new ArrayList<>();
 
-	private static boolean alreadyRegisteredTestCluster = false;
-
 	private static Configuration conf;
 
 	protected static void createTable(TableName tableName, byte[][] columnFamilyName, byte[][] splitKeys) {
@@ -157,6 +144,7 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 		TEST_UTIL.getConfiguration().setInt("hbase.master.info.port", -1);
 
 		// Make sure the zookeeper quorum value contains the right port number (varies per run).
+		LOG.info("Hbase minicluster client port: " + TEST_UTIL.getZkCluster().getClientPort());
 		TEST_UTIL.getConfiguration().set("hbase.zookeeper.quorum", "localhost:" + TEST_UTIL.getZkCluster().getClientPort());
 
 		conf = initialize(TEST_UTIL.getConfiguration());
@@ -170,77 +158,10 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 		return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
 	}
 
-	private static File hbaseSiteXmlDirectory;
-	private static File hbaseSiteXmlFile;
-
-	/**
-	 * This dynamically generates a hbase-site.xml file that is added to the classpath.
-	 * This way this HBaseMinicluster can be used by an unmodified application.
-	 * The downside is that this cannot be 'unloaded' so you can have only one per JVM.
-	 */
-	public static void registerHBaseMiniClusterInClasspath() {
-		if (alreadyRegisteredTestCluster) {
-			fail("You CANNOT register a second HBase Testing cluster in the classpath of the SAME JVM");
-		}
-		File baseDir = new File(System.getProperty("java.io.tmpdir", "/tmp/"));
-		hbaseSiteXmlDirectory = new File(baseDir, "unittest-hbase-minicluster-" + Math.abs(new Random().nextLong()) + "/");
-
-		if (!hbaseSiteXmlDirectory.mkdirs()) {
-			fail("Unable to create output directory " + hbaseSiteXmlDirectory + " for the HBase minicluster");
-		}
-
-		assertNotNull("The ZooKeeper for the HBase minicluster is missing", TEST_UTIL.getZkCluster());
-
-		createHBaseSiteXml(hbaseSiteXmlDirectory, TEST_UTIL.getConfiguration().get("hbase.zookeeper.quorum"));
-		addDirectoryToClassPath(hbaseSiteXmlDirectory);
-
-		// Avoid starting it again.
-		alreadyRegisteredTestCluster = true;
-	}
-
 	public static Configuration getConf() {
 		return conf;
 	}
 
-	private static void createHBaseSiteXml(File hbaseSiteXmlDirectory, String zookeeperQuorum) {
-		hbaseSiteXmlFile = new File(hbaseSiteXmlDirectory, "hbase-site.xml");
-		// Create the hbase-site.xml file for this run.
-		try {
-			String hbaseSiteXml = "<?xml version=\"1.0\"?>\n" +
-				"<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>\n" +
-				"<configuration>\n" +
-				"  <property>\n" +
-				"    <name>hbase.zookeeper.quorum</name>\n" +
-				"    <value>" + zookeeperQuorum + "</value>\n" +
-				"  </property>\n" +
-				"</configuration>";
-			OutputStream fos = new FileOutputStream(hbaseSiteXmlFile);
-			fos.write(hbaseSiteXml.getBytes(StandardCharsets.UTF_8));
-			fos.close();
-		} catch (IOException e) {
-			fail("Unable to create " + hbaseSiteXmlFile);
-		}
-	}
-
-	private static void addDirectoryToClassPath(File directory) {
-		try {
-			// Get the classloader actually used by HBaseConfiguration
-			ClassLoader classLoader = HBaseConfiguration.create().getClassLoader();
-			if (!(classLoader instanceof URLClassLoader)) {
-				fail("We should get a URLClassLoader");
-			}
-
-			// Make the addURL method accessible
-			Method method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
-			method.setAccessible(true);
-
-			// Add the directory where we put the hbase-site.xml to the classpath
-			method.invoke(classLoader, directory.toURI().toURL());
-		} catch (MalformedURLException | NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
-			fail("Unable to add " + directory + " to classpath because of this exception: " + e.getMessage());
-		}
-	}
-
 	@AfterClass
 	public static void tearDown() throws Exception {
 		if (conf == null) {
@@ -249,8 +170,6 @@ public abstract class HBaseTestingClusterAutoStarter extends AbstractTestBase {
 		}
 		LOG.info("HBase minicluster: Shutting down");
 		deleteTables();
-		hbaseSiteXmlFile.delete();
-		hbaseSiteXmlDirectory.delete();
 		TEST_UTIL.shutdownMiniCluster();
 		LOG.info("HBase minicluster: Down");
 	}