You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/20 02:12:51 UTC
[flink] 02/03: [FLINK-17797][connector/hbase] Align the behavior
between the new and legacy HBase table source
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57e4748614bdbe06769b147bc264e4a400784379
Author: Jark Wu <ja...@apache.org>
AuthorDate: Mon May 18 17:48:08 2020 +0800
[FLINK-17797][connector/hbase] Align the behavior between the new and legacy HBase table source
This closes #12221
---
flink-connectors/flink-connector-hbase/pom.xml | 8 ++
.../connector/hbase/HBaseDynamicTableFactory.java | 43 ++++++-
.../hbase/source/HBaseDynamicTableSource.java | 21 +++-
.../flink/connector/hbase/util/HBaseSerde.java | 24 ++--
.../connector/hbase/util/HBaseTableSchema.java | 12 --
.../flink/connector/hbase/HBaseTablePlanTest.java | 127 +++++++++++++++++++++
.../flink/connector/hbase/HBaseTablePlanTest.xml | 36 ++++++
7 files changed, 242 insertions(+), 29 deletions(-)
diff --git a/flink-connectors/flink-connector-hbase/pom.xml b/flink-connectors/flink-connector-hbase/pom.xml
index d3115b5..b2e0b4a 100644
--- a/flink-connectors/flink-connector-hbase/pom.xml
+++ b/flink-connectors/flink-connector-hbase/pom.xml
@@ -207,6 +207,14 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index ba85577..64b381f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -103,15 +103,15 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
public DynamicTableSource createDynamicTableSource(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ validatePrimaryKey(tableSchema);
+
String hTableName = helper.getOptions().get(TABLE_NAME);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
Configuration hbaseClientConf = HBaseConfiguration.create();
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, helper.getOptions().get(ZOOKEEPER_QUORUM));
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
-
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
- TableSchema tableSchema = context.getCatalogTable().getSchema();
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
return new HBaseDynamicTableSource(
@@ -125,6 +125,9 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validate();
+ TableSchema tableSchema = context.getCatalogTable().getSchema();
+ validatePrimaryKey(tableSchema);
+
HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
hbaseOptionsBuilder.setTableName(helper.getOptions().get(TABLE_NAME));
hbaseOptionsBuilder.setZkQuorum(helper.getOptions().get(ZOOKEEPER_QUORUM));
@@ -136,10 +139,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
.ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
helper.getOptions().getOptional(SINK_BUFFER_FLUSH_MAX_ROWS)
.ifPresent(writeBuilder::setBufferFlushMaxRows);
-
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
-
- TableSchema tableSchema = context.getCatalogTable().getSchema();
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(tableSchema);
return new HBaseDynamicTableSink(
@@ -172,4 +172,35 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
set.add(SINK_BUFFER_FLUSH_INTERVAL);
return set;
}
+
+ // ------------------------------------------------------------------------------------------
+
+ /**
+ * Checks that the HBase table have row key defined. A row key is defined as an atomic type,
+ * and column families and qualifiers are defined as ROW type. There shouldn't be multiple
+ * atomic type columns in the schema. The PRIMARY KEY constraint is optional, if exist, the
+ * primary key constraint must be defined on the single row key field.
+ */
+ private static void validatePrimaryKey(TableSchema schema) {
+ HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(schema);
+ if (!hbaseSchema.getRowKeyName().isPresent()) {
+ throw new IllegalArgumentException(
+ "HBase table requires to define a row key field. " +
+ "A row key field is defined as an atomic type, " +
+ "column families and qualifiers are defined as ROW type.");
+ }
+ schema.getPrimaryKey().ifPresent(k -> {
+ if (k.getColumns().size() > 1) {
+ throw new IllegalArgumentException(
+ "HBase table doesn't support a primary Key on multiple columns. " +
+ "The primary key of HBase table must be defined on row key field.");
+ }
+ if (!hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
+ throw new IllegalArgumentException(
+ "Primary key of HBase table must be defined on the row key field. " +
+ "A row key field is defined as an atomic type, " +
+ "column families and qualifiers are defined as ROW type.");
+ }
+ });
+ }
}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
index e59a12e..dcc5a5b 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/source/HBaseDynamicTableSource.java
@@ -21,12 +21,15 @@ package org.apache.flink.connector.hbase.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
+import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.hadoop.conf.Configuration;
@@ -36,11 +39,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* HBase table source implementation.
*/
@Internal
-public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource {
+public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {
private final Configuration conf;
private final String tableName;
- private final HBaseTableSchema hbaseSchema;
+ private HBaseTableSchema hbaseSchema;
private final String nullStringLiteral;
public HBaseDynamicTableSource(
@@ -78,6 +81,20 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSour
}
@Override
+ public boolean supportsNestedProjection() {
+ // planner doesn't support nested projection push down yet.
+ return false;
+ }
+
+ @Override
+ public void applyProjection(int[][] projectedFields) {
+ TableSchema projectSchema = TableSchemaUtils.projectSchema(
+ hbaseSchema.convertsToTableSchema(),
+ projectedFields);
+ this.hbaseSchema = HBaseTableSchema.fromTableSchema(projectSchema);
+ }
+
+ @Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
index ed4a11f..e5a377f 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
@@ -67,8 +67,8 @@ public class HBaseSerde {
private GenericRowData reusedRow;
private GenericRowData[] reusedFamilyRows;
- private final FieldEncoder keyEncoder;
- private final FieldDecoder keyDecoder;
+ private final @Nullable FieldEncoder keyEncoder;
+ private final @Nullable FieldDecoder keyDecoder;
private final FieldEncoder[][] qualifierEncoders;
private final FieldDecoder[][] qualifierDecoders;
@@ -78,18 +78,21 @@ public class HBaseSerde {
LogicalType rowkeyType = hbaseSchema.getRowKeyDataType().map(DataType::getLogicalType).orElse(null);
// field length need take row key into account if it exists.
- checkArgument(rowkeyIndex != -1 && rowkeyType != null, "row key is not set.");
- this.fieldLength = families.length + 1;
+ if (rowkeyIndex != -1 && rowkeyType != null) {
+ this.fieldLength = families.length + 1;
+ this.keyEncoder = createFieldEncoder(rowkeyType);
+ this.keyDecoder = createFieldDecoder(rowkeyType);
+ } else {
+ this.fieldLength = families.length;
+ this.keyEncoder = null;
+ this.keyDecoder = null;
+ }
this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
// prepare output rows
this.reusedRow = new GenericRowData(fieldLength);
this.reusedFamilyRows = new GenericRowData[families.length];
- // row key should never be null
- this.keyEncoder = createFieldEncoder(rowkeyType);
- this.keyDecoder = createFieldDecoder(rowkeyType);
-
this.qualifiers = new byte[families.length][][];
this.qualifierEncoders = new FieldEncoder[families.length][];
this.qualifierDecoders = new FieldDecoder[families.length][];
@@ -115,6 +118,7 @@ public class HBaseSerde {
* @return The appropriate instance of Put for this use case.
*/
public @Nullable Put createPutMutation(RowData row) {
+ checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
@@ -146,6 +150,7 @@ public class HBaseSerde {
* @return The appropriate instance of Delete for this use case.
*/
public @Nullable Delete createDeleteMutation(RowData row) {
+ checkArgument(keyEncoder != null, "row key is not set.");
byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
@@ -189,9 +194,10 @@ public class HBaseSerde {
* Converts HBase {@link Result} into {@link RowData}.
*/
public RowData convertToRow(Result result) {
- Object rowkey = keyDecoder.decode(result.getRow());
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
+ assert keyDecoder != null;
+ Object rowkey = keyDecoder.decode(result.getRow());
reusedRow.setField(rowkeyIndex, rowkey);
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index 41108f4..116b1ae 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -361,18 +361,6 @@ public class HBaseTableSchema implements Serializable {
"Unsupported field type '" + fieldType + "' for HBase.");
}
}
- schema.getPrimaryKey().ifPresent(k -> {
- if (k.getColumns().size() > 1 ||
- !hbaseSchema.getRowKeyName().isPresent() ||
- !hbaseSchema.getRowKeyName().get().equals(k.getColumns().get(0))) {
- throw new IllegalArgumentException(
- "Primary Key of HBase table should only be defined on the row key field.");
- }
- });
- if (!hbaseSchema.getRowKeyName().isPresent()) {
- throw new IllegalArgumentException(
- "HBase table requires to define a row key field. A row key field must be an atomic type.");
- }
return hbaseSchema;
}
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
new file mode 100644
index 0000000..053cf99
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseTablePlanTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+import static org.apache.flink.util.CoreMatchers.containsCause;
+
+/**
+ * Plan tests for HBase connector, for example, testing projection push down.
+ */
+public class HBaseTablePlanTest extends TableTestBase {
+
+ private final StreamTableTestUtil util = streamTestUtil(new TableConfig());
+
+ @Test
+ public void testMultipleRowKey() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE hTable (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>," +
+ " rowkey INT," +
+ " rowkey2 STRING " +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = 'my_table'," +
+ " 'zookeeper.quorum' = 'localhost:2021'" +
+ ")");
+ thrown().expect(containsCause(new IllegalArgumentException("Row key can't be set multiple times.")));
+ util.verifyPlan("SELECT * FROM hTable");
+ }
+
+ @Test
+ public void testNoneRowKey() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE hTable (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>" +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = 'my_table'," +
+ " 'zookeeper.quorum' = 'localhost:2021'" +
+ ")");
+ thrown().expect(containsCause(new IllegalArgumentException(
+ "HBase table requires to define a row key field. " +
+ "A row key field is defined as an atomic type, " +
+ "column families and qualifiers are defined as ROW type.")));
+ util.verifyPlan("SELECT * FROM hTable");
+ }
+
+ @Test
+ public void testInvalidPrimaryKey() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE hTable (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>," +
+ " rowkey STRING, " +
+ " PRIMARY KEY (family1) NOT ENFORCED " +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = 'my_table'," +
+ " 'zookeeper.quorum' = 'localhost:2021'" +
+ ")");
+ thrown().expect(containsCause(new IllegalArgumentException(
+ "Primary key of HBase table must be defined on the row key field. " +
+ "A row key field is defined as an atomic type, " +
+ "column families and qualifiers are defined as ROW type.")));
+ util.verifyPlan("SELECT * FROM hTable");
+ }
+
+ @Test
+ public void testUnsupportedDataType() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE hTable (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>," +
+ " col1 ARRAY<STRING>, " +
+ " rowkey STRING, " +
+ " PRIMARY KEY (rowkey) NOT ENFORCED " +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = 'my_table'," +
+ " 'zookeeper.quorum' = 'localhost:2021'" +
+ ")");
+ thrown().expect(containsCause(new IllegalArgumentException(
+ "Unsupported field type 'ARRAY<STRING>' for HBase.")));
+ util.verifyPlan("SELECT * FROM hTable");
+ }
+
+ @Test
+ public void testProjectionPushDown() {
+ util.tableEnv().executeSql(
+ "CREATE TABLE hTable (" +
+ " family1 ROW<col1 INT>," +
+ " family2 ROW<col1 STRING, col2 BIGINT>," +
+ " family3 ROW<col1 DOUBLE, col2 BOOLEAN, col3 STRING>," +
+ " rowkey INT," +
+ " PRIMARY KEY (rowkey) NOT ENFORCED" +
+ ") WITH (" +
+ " 'connector' = 'hbase-1.4'," +
+ " 'table-name' = 'my_table'," +
+ " 'zookeeper.quorum' = 'localhost:2021'" +
+ ")");
+ util.verifyPlan("SELECT h.family3, h.family2.col2 FROM hTable AS h");
+ }
+
+}
diff --git a/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
new file mode 100644
index 0000000..8391b1b
--- /dev/null
+++ b/flink-connectors/flink-connector-hbase/src/test/resources/org/apache/flink/connector/hbase/HBaseTablePlanTest.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+ <TestCase name="testProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT h.family3, h.family2.col2 FROM hTable AS h]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(family3=[$2], col2=[$1.col2])
++- LogicalTableScan(table=[[default_catalog, default_database, hTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[family3, family2.col2 AS col2])
++- TableSourceScan(table=[[default_catalog, default_database, hTable, project=[family3, family2]]], fields=[family3, family2])
+]]>
+ </Resource>
+ </TestCase>
+</Root>