You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:51 UTC

[flink] 02/03: [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57e4748614bdbe06769b147bc264e4a400784379
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:08 2020 +0800

    [FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source
    
    This closes #12221
---
 flink-connectors/flink-connector-hbase/pom.xml     |   8 ++
 .../connector/hbase/HBaseDynamicTableFactory.java  |  43 ++++++-
 .../hbase/source/HBaseDynamicTableSource.java      |  21 +++-
 .../flink/connector/hbase/util/HBaseSerde.java     |  24 ++--
 .../connector/hbase/util/HBaseTableSchema.java     |  12 --
 .../flink/connector/hbase/HBaseTablePlanTest.java  | 127 +++++++++++++++++++++
 .../flink/connector/hbase/HBaseTablePlanTest.xml   |  36 ++++++
 7 files changed, 242 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/pom.xml b/flink-connectors/flink-connector-hbase/pom.xml
index d3115b5..b2e0b4a 100644
--- a/flink-connectors/flink-connector-hbase/pom.xml
+++ b/flink-connectors/flink-connector-hbase/pom.xml
@@ -207,6 +207,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-clients_${scala.binary.version}</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index ba85577..64b381f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -103,15 +103,15 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 	public DynamicTableSource createDynamicTableSource(Context context) {
 		TableFactoryHelper helper = createTableFactoryHelper(this, context);
 		helper.validate();
+		TableSchema tableSchema = context.getCatalogTable().getSchema();
+		validatePrimaryKey(tableSchema);
+
 		String hTableName = helper.getOptions().get(TABLE_NAME);
 		// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
 		Configuration hbaseClientConf = HBaseConfiguration.create();
 		hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
 		hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
-
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
-		TableSchema tableSchema = context.getCatalogTable().getSchema();
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
 		return new HBaseDynamicTableSource(
@@ -125,6 +125,9 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 	public DynamicTableSink createDynamicTableSink(Context context) {
 		TableFactoryHelper helper = createTableFactoryHelper(this, context);
 		helper.validate();
+		TableSchema tableSchema = context.getCatalogTable().getSchema();
+		validatePrimaryKey(tableSchema);
+
 		HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
 		hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME));
 		hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM));
@@ -136,10 +139,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 			.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
 		helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
 			.ifPresent(writeBuilder::setBufferFlushMaxRows);
-
 		String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
-		TableSchema tableSchema = context.getCatalogTable().getSchema();
 		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
 
 		return new HBaseDynamicTableSink(
@@ -172,4 +172,35 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 		set.add(SINK_BUFFER_FLUSH_INTERVAL);
 		return set;
 	}
+
+	// ------------------------------------------------------------------------------------------
+
+	/**
+	 * Checks that the HBase table have row key defined. A row key is defined as an atomic type,
+	 * and column families and qualifiers are defined as ROW type. There shouldn't be multiple
+	 * atomic type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the
+	 * primary key constraint must be defined on the single row key field.
+	 */
+	private static void validatePrimaryKey(TableSchema schema) {
+		HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
+		if (!hbaseSchema.getRowKeyName().isPresent()) {
+			throw new IllegalArgumentException(
+				"HBase table requires to define a row key field. " +
+					"A row key field is defined as an atomic type, " +
+					"column families and qualifiers are defined as ROW type.");
+		}
+		schema.getPrimaryKey().ifPresent(k -> {
+			if (k.getColumns().size() > 1) {
+				throw new IllegalArgumentException(
+					"HBase table doesn't support a primary Key on multiple columns. " +
+						"The primary key of HBase table must be defined on row key field.");
+			}
+			if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
+				throw new IllegalArgumentException(
+					"Primary key of HBase table must be defined on the row key field. " +
+						"A row key field is defined as an atomic type, " +
+						"column families and qualifiers are defined as ROW type.");
+			}
+		});
+	}
 }
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index e59a12e..dcc5a5b 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -21,12 +21,15 @@ package org.apache.flink.connector.hbase.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.utils.TableSchemaUtils;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -36,11 +39,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * HBase table source implementation.
  */
 @Internal
-public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
 
 	private final Configuration conf;
 	private final String tableName;
-	private final HBaseTableSchema hbaseSchema;
+	private HBaseTableSchema hbaseSchema;
 	private final String nullStringLiteral;
 
 	public HBaseDynamicTableSource(
@@ -78,6 +81,20 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
 	}
 
 	@Override
+	public boolean supportsNestedProjection() {
+		// planner doesn't support nested projection push down yet.
+		return false;
+	}
+
+	@Override
+	public void applyProjection(int[][] projectedFields) {
+		TableSchema projectSchema = TableSchemaUtils.projectSchema(
+			hbaseSchema.convertsToTableSchema(),
+			projectedFields);
+		this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
+	}
+
+	@Override
 	public ChangelogMode getChangelogMode() {
 		return ChangelogMode.insertOnly();
 	}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index ed4a11f..e5a377f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -67,8 +67,8 @@ public class HBaseSerde {
 	private GenericRowData reusedRow;
 	private GenericRowData[] reusedFamilyRows;
 
-	private final FieldEncoder keyEncoder;
-	private final FieldDecoder keyDecoder;
+	private final @Nullable FieldEncoder keyEncoder;
+	private final @Nullable FieldDecoder keyDecoder;
 	private final FieldEncoder[][] qualifierEncoders;
 	private final FieldDecoder[][] qualifierDecoders;
 
@@ -78,18 +78,21 @@ public class HBaseSerde {
 		LogicalType rowkeyType = hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
 
 		// field length need take row key into account if it exists.
-		checkArgument(rowkeyIndex != -1 && rowkeyType != null, "row key is not set.");
-		this.fieldLength = families.length + 1;
+		if (rowkeyIndex != -1 && rowkeyType != null) {
+			this.fieldLength = families.length + 1;
+			this.keyEncoder = createFieldEncoder(rowkeyType);
+			this.keyDecoder = createFieldDecoder(rowkeyType);
+		} else {
+			this.fieldLength = families.length;
+			this.keyEncoder = null;
+			this.keyDecoder = null;
+		}
 		this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
 
 		// prepare output rows
 		this.reusedRow = new GenericRowData(fieldLength);
 		this.reusedFamilyRows = new GenericRowData[families.length];
 
-		// row key should never be null
-		this.keyEncoder = createFieldEncoder(rowkeyType);
-		this.keyDecoder = createFieldDecoder(rowkeyType);
-
 		this.qualifiers = new byte[families.length][][];
 		this.qualifierEncoders = new FieldEncoder[families.length][];
 		this.qualifierDecoders = new FieldDecoder[families.length][];
@@ -115,6 +118,7 @@ public class HBaseSerde {
 	 * @return The appropriate instance of Put for this use case.
 	 */
 	public @Nullable Put createPutMutation(RowData row) {
+		checkArgument(keyEncoder != null, "row key is not set.");
 		byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
 		if (rowkey.length == 0) {
 			// drop dirty records, rowkey shouldn't be zero length
@@ -146,6 +150,7 @@ public class HBaseSerde {
 	 * @return The appropriate instance of Delete for this use case.
 	 */
 	public @Nullable Delete createDeleteMutation(RowData row) {
+		checkArgument(keyEncoder != null, "row key is not set.");
 		byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
 		if (rowkey.length == 0) {
 			// drop dirty records, rowkey shouldn't be zero length
@@ -189,9 +194,10 @@ public class HBaseSerde {
 	 * Converts HBase {@link Result} into {@link RowData}.
 	 */
 	public RowData convertToRow(Result result) {
-		Object rowkey = keyDecoder.decode(result.getRow());
 		for (int i = 0; i < fieldLength; i++) {
 			if (rowkeyIndex == i) {
+				assert keyDecoder != null;
+				Object rowkey = keyDecoder.decode(result.getRow());
 				reusedRow.setField(rowkeyIndex, rowkey);
 			} else {
 				int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index 41108f4..116b1ae 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -361,18 +361,6 @@ public class HBaseTableSchema implements Serializable {
 					"Unsupported field type '" + fieldType + "' for HBase.");
 			}
 		}
-		schema.getPrimaryKey().ifPresent(k -> {
-			if (k.getColumns().size() > 1 ||
-					!hbaseSchema.getRowKeyName().isPresent() ||
-					!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
-				throw new IllegalArgumentException(
-					"Primary Key of HBase table should only be defined on the row key field.");
-			}
-		});
-		if (!hbaseSchema.getRowKeyName().isPresent()) {
-			throw new IllegalArgumentException(
-				"HBase table requires to define a row key field. A row key field must be an atomic type.");
-		}
 		return hbaseSchema;
 	}
 
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
new file mode 100644
index 0000000..053cf99
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * Plan tests for HBase connector, for example, testing projection push down.
+ */
+public class HBaseTablePlanTest extends TableTestBase {
+
+	private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+	@Test
+	public void testMultipleRowKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" rowkey INT," +
+				" rowkey2 STRING " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException("Row key can't be set multiple times.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testNoneRowKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>" +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"HBase table requires to define a row key field. " +
+				"A row key field is defined as an atomic type, " +
+				"column families and qualifiers are defined as ROW type.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testInvalidPrimaryKey() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" rowkey STRING, " +
+				" PRIMARY KEY (family1) NOT ENFORCED " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"Primary key of HBase table must be defined on the row key field. " +
+				"A row key field is defined as an atomic type, " +
+				"column families and qualifiers are defined as ROW type.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testUnsupportedDataType() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" col1 ARRAY<STRING>, " +
+				" rowkey STRING, " +
+				" PRIMARY KEY (rowkey) NOT ENFORCED " +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		thrown().expect(containsCause(new IllegalArgumentException(
+			"Unsupported field type 'ARRAY<STRING>' for HBase.")));
+		util.verifyPlan("SELECT * FROM hTable");
+	}
+
+	@Test
+	public void testProjectionPushDown() {
+		util.tableEnv().executeSql(
+			"CREATE TABLE hTable (" +
+				" family1 ROW<col1 INT>," +
+				" family2 ROW<col1 STRING, col2 BIGINT>," +
+				" family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," +
+				" rowkey INT," +
+				" PRIMARY KEY (rowkey) NOT ENFORCED" +
+				") WITH (" +
+				" 'connector' = 'hbase-1.4'," +
+				" 'table-name' = 'my_table'," +
+				" 'zookeeper.quorum' = 'localhost:2021'" +
+				")");
+		util.verifyPlan("SELECT h.family3, h.family2.col2 FROM hTable AS h");
+	}
+
+}
diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
new file mode 100644
index 0000000..8391b1b
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT h.family3, h.family2.col2 FROM hTable AS h]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(family3=[$2], col2=[$1.col2])
++- LogicalTableScan(table=[[default_catalog, default_database, hTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[family3, family2.col2 AS col2])
++- TableSourceScan(table=[[default_catalog, default_database, hTable, project=[family3, family2]]], fields=[family3, family2])
+]]>
+    </Resource>
+  </TestCase>
+</Root>