You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/11 14:54:42 UTC
[flink] branch master updated: [FLINK-10245][hbase] Add an upsert
table sink factory for HBase
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 01031ad [FLINK-10245][hbase] Add an upsert table sink factory for HBase
01031ad is described below
commit 01031ad2daad4d02006c527b122856118dd21434
Author: Shimin Yang <ya...@youzan.com>
AuthorDate: Tue Aug 28 18:25:33 2018 +0800
[FLINK-10245][hbase] Add an upsert table sink factory for HBase
This commit adds full support for HBase to be used with Table & SQL API.
It includes:
- HBase upsert table sink (for append-only and updating queries)
- HBase table factory
- HBase table descriptors & validators
- Unit tests
This closes #9075
---
.../flink/addons/hbase/HBaseLookupFunction.java | 6 +-
.../apache/flink/addons/hbase/HBaseOptions.java | 135 +++++++++++
.../flink/addons/hbase/HBaseRowInputFormat.java | 6 +-
.../flink/addons/hbase/HBaseTableFactory.java | 60 ++++-
.../flink/addons/hbase/HBaseTableSchema.java | 6 +-
.../addons/hbase/HBaseUpsertSinkFunction.java | 254 +++++++++++++++++++++
.../flink/addons/hbase/HBaseUpsertTableSink.java | 133 +++++++++++
.../flink/addons/hbase/HBaseWriteOptions.java | 151 ++++++++++++
...seReadHelper.java => HBaseReadWriteHelper.java} | 67 +++++-
.../flink/table/descriptors/HBaseValidator.java | 20 +-
.../addons/hbase/HBaseLookupFunctionITCase.java | 8 +-
.../apache/flink/addons/hbase/HBaseSinkITCase.java | 186 +++++++++++++++
.../flink/addons/hbase/HBaseTableFactoryTest.java | 72 ++++--
.../hbase/HBaseTestingClusterAutostarter.java | 7 +
14 files changed, 1068 insertions(+), 43 deletions(-)
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
index 938d834..4176efc 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseLookupFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.addons.hbase;
import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
-import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.functions.FunctionContext;
@@ -53,7 +53,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
private final byte[] serializedConfig;
private final HBaseTableSchema hbaseTableSchema;
- private transient HBaseReadHelper readHelper;
+ private transient HBaseReadWriteHelper readHelper;
private transient Connection hConnection;
private transient HTable table;
@@ -115,7 +115,7 @@ public class HBaseLookupFunction extends TableFunction<Row> {
LOG.error("Exception while creating connection to HBase.", ioe);
throw new RuntimeException("Cannot create connection to HBase.", ioe);
}
- this.readHelper = new HBaseReadHelper(hbaseTableSchema);
+ this.readHelper = new HBaseReadWriteHelper(hbaseTableSchema);
LOG.info("end open.");
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java
new file mode 100644
index 0000000..1317e96
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseOptions.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Common Options for HBase.
+ */
+public class HBaseOptions {
+
+ private final String tableName;
+ private final String zkQuorum;
+ @Nullable private final String zkNodeParent;
+
+ private HBaseOptions(String tableName, String zkQuorum, @Nullable String zkNodeParent) {
+ this.tableName = tableName;
+ this.zkQuorum = zkQuorum;
+ this.zkNodeParent = zkNodeParent;
+ }
+
+ String getTableName() {
+ return tableName;
+ }
+
+ String getZkQuorum() {
+ return zkQuorum;
+ }
+
+ Optional<String> getZkNodeParent() {
+ return Optional.ofNullable(zkNodeParent);
+ }
+
+ @Override
+ public String toString() {
+ return "HBaseOptions{" +
+ "tableName='" + tableName + '\'' +
+ ", zkQuorum='" + zkQuorum + '\'' +
+ ", zkNodeParent='" + zkNodeParent + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HBaseOptions that = (HBaseOptions) o;
+ return Objects.equals(tableName, that.tableName) &&
+ Objects.equals(zkQuorum, that.zkQuorum) &&
+ Objects.equals(zkNodeParent, that.zkNodeParent);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, zkQuorum, zkNodeParent);
+ }
+
+ /**
+ * Creates a builder of {@link HBaseOptions}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link HBaseOptions}.
+ */
+ public static class Builder {
+
+ private String tableName;
+ private String zkQuorum;
+ private String zkNodeParent;
+
+ /**
+ * Required. Sets the HBase table name.
+ */
+ public Builder setTableName(String tableName) {
+ checkNotNull(tableName);
+ this.tableName = tableName;
+ return this;
+ }
+
+ /**
+ * Required. Sets the HBase ZooKeeper quorum configuration.
+ */
+ public Builder setZkQuorum(String zkQuorum) {
+ checkNotNull(zkQuorum);
+ this.zkQuorum = zkQuorum;
+ return this;
+ }
+
+ /**
+ * Optional. Sets the root dir in ZK for the HBase cluster. Default is "/hbase".
+ */
+ public Builder setZkNodeParent(String zkNodeParent) {
+ checkNotNull(zkNodeParent);
+ this.zkNodeParent = zkNodeParent;
+ return this;
+ }
+
+ /**
+ * Creates an instance of {@link HBaseOptions}.
+ */
+ public HBaseOptions build() {
+ checkNotNull(zkQuorum, "Zookeeper quorum is not set.");
+ checkNotNull(tableName, "TableName is not set.");
+ return new HBaseOptions(tableName, zkQuorum, zkNodeParent);
+ }
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
index fd2a116..1d4c0ec 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseRowInputFormat.java
@@ -18,7 +18,7 @@
package org.apache.flink.addons.hbase;
-import org.apache.flink.addons.hbase.util.HBaseReadHelper;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -52,7 +52,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
private final HBaseTableSchema schema;
private transient org.apache.hadoop.conf.Configuration conf;
- private transient HBaseReadHelper readHelper;
+ private transient HBaseReadWriteHelper readHelper;
public HBaseRowInputFormat(org.apache.hadoop.conf.Configuration conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
@@ -64,7 +64,7 @@ public class HBaseRowInputFormat extends AbstractTableInputFormat<Row> implement
public void configure(Configuration parameters) {
LOG.info("Initializing HBase configuration.");
// prepare hbase read helper
- this.readHelper = new HBaseReadHelper(schema);
+ this.readHelper = new HBaseReadWriteHelper(schema);
connectToTable();
if (table != null) {
scan = getScanner();
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
index 922b26f..26fb973 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableFactory.java
@@ -19,11 +19,14 @@
package org.apache.flink.addons.hbase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.HBaseValidator;
+import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
@@ -39,10 +42,14 @@ import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_NODE_PARENT;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
@@ -50,21 +57,56 @@ import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
/**
* Factory for creating configured instances of {@link HBaseTableSource} or sink.
*/
-public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
+public class HBaseTableFactory implements StreamTableSourceFactory<Row>, StreamTableSinkFactory<Tuple2<Boolean, Row>> {
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
// create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
Configuration hbaseClientConf = HBaseConfiguration.create();
- String hbaseZk = properties.get(CONNECTOR_HBASE_ZK_QUORUM);
+ String hbaseZk = descriptorProperties.getString(CONNECTOR_ZK_QUORUM);
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseZk);
- String hTableName = descriptorProperties.getString(CONNECTOR_HBASE_TABLE_NAME);
+ descriptorProperties
+ .getOptionalString(CONNECTOR_ZK_NODE_PARENT)
+ .ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+
+ String hTableName = descriptorProperties.getString(CONNECTOR_TABLE_NAME);
TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
return new HBaseTableSource(hbaseClientConf, hTableName, hbaseSchema, null);
}
+ @Override
+ public StreamTableSink<Tuple2<Boolean, Row>> createStreamTableSink(Map<String, String> properties) {
+ final DescriptorProperties descriptorProperties = getValidatedProperties(properties);
+ HBaseOptions.Builder hbaseOptionsBuilder = HBaseOptions.builder();
+ hbaseOptionsBuilder.setZkQuorum(descriptorProperties.getString(CONNECTOR_ZK_QUORUM));
+ hbaseOptionsBuilder.setTableName(descriptorProperties.getString(CONNECTOR_TABLE_NAME));
+ descriptorProperties
+ .getOptionalString(CONNECTOR_ZK_NODE_PARENT)
+ .ifPresent(hbaseOptionsBuilder::setZkNodeParent);
+
+ TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA);
+ HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
+
+ HBaseWriteOptions.Builder writeBuilder = HBaseWriteOptions.builder();
+ descriptorProperties
+ .getOptionalInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS)
+ .ifPresent(writeBuilder::setBufferFlushMaxRows);
+ descriptorProperties
+ .getOptionalMemorySize(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE)
+ .ifPresent(v -> writeBuilder.setBufferFlushMaxSizeInBytes(v.getBytes()));
+ descriptorProperties
+ .getOptionalDuration(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL)
+ .ifPresent(v -> writeBuilder.setBufferFlushIntervalMillis(v.toMillis()));
+
+ return new HBaseUpsertTableSink(
+ hbaseSchema,
+ hbaseOptionsBuilder.build(),
+ writeBuilder.build()
+ );
+ }
+
private HBaseTableSchema validateTableSchema(TableSchema schema) {
HBaseTableSchema hbaseSchema = new HBaseTableSchema();
String[] fieldNames = schema.getFieldNames();
@@ -106,8 +148,12 @@ public class HBaseTableFactory implements StreamTableSourceFactory<Row> {
public List<String> supportedProperties() {
List<String> properties = new ArrayList<>();
- properties.add(CONNECTOR_HBASE_TABLE_NAME);
- properties.add(CONNECTOR_HBASE_ZK_QUORUM);
+ properties.add(CONNECTOR_TABLE_NAME);
+ properties.add(CONNECTOR_ZK_QUORUM);
+ properties.add(CONNECTOR_ZK_NODE_PARENT);
+ properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE);
+ properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS);
+ properties.add(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL);
// schema
properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
index af642d1..0dd12de 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
@@ -54,7 +54,7 @@ public class HBaseTableSchema implements Serializable {
* @param qualifier the qualifier name
* @param clazz the data type of the qualifier
*/
- void addColumn(String family, String qualifier, Class<?> clazz) {
+ public void addColumn(String family, String qualifier, Class<?> clazz) {
Preconditions.checkNotNull(family, "family name");
Preconditions.checkNotNull(qualifier, "qualifier name");
Preconditions.checkNotNull(clazz, "class type");
@@ -78,7 +78,7 @@ public class HBaseTableSchema implements Serializable {
* @param rowKeyName the row key field name
* @param clazz the data type of the row key
*/
- void setRowKey(String rowKeyName, Class<?> clazz) {
+ public void setRowKey(String rowKeyName, Class<?> clazz) {
Preconditions.checkNotNull(rowKeyName, "row key field name");
Preconditions.checkNotNull(clazz, "row key class type");
if (!HBaseTypeUtils.isSupportedType(clazz)) {
@@ -97,7 +97,7 @@ public class HBaseTableSchema implements Serializable {
*
* @param charset the charset for value strings and HBase identifiers.
*/
- void setCharset(String charset) {
+ public void setCharset(String charset) {
this.charset = charset;
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
new file mode 100644
index 0000000..0c6bfae
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertSinkFunction.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.addons.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.addons.hbase.util.HBaseReadWriteHelper;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The upsert sink for HBase.
+ *
+ * <p>This class leverage {@link BufferedMutator} to buffer multiple
+ * {@link org.apache.hadoop.hbase.client.Mutation Mutations} before sending the requests to cluster.
+ * The buffering strategy can be configured by {@code bufferFlushMaxSizeInBytes},
+ * {@code bufferFlushMaxMutations} and {@code bufferFlushIntervalMillis}.</p>
+ */
+public class HBaseUpsertSinkFunction
+ extends RichSinkFunction<Tuple2<Boolean, Row>>
+ implements CheckpointedFunction, BufferedMutator.ExceptionListener {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG = LoggerFactory.getLogger(HBaseUpsertSinkFunction.class);
+
+ private final String hTableName;
+ private final HBaseTableSchema schema;
+ private final byte[] serializedConfig;
+
+ private final long bufferFlushMaxSizeInBytes;
+ private final long bufferFlushMaxMutations;
+ private final long bufferFlushIntervalMillis;
+
+ private transient HBaseReadWriteHelper helper;
+
+ private transient Connection connection;
+ private transient BufferedMutator mutator;
+
+ private transient ScheduledExecutorService executor;
+ private transient ScheduledFuture scheduledFuture;
+ private transient AtomicLong numPendingRequests;
+
+ private transient volatile boolean closed = false;
+
+ /**
+ * This is set from inside the {@link BufferedMutator.ExceptionListener} if a {@link Throwable}
+ * was thrown.
+ *
+ * <p>Errors will be checked and rethrown before processing each input element, and when the sink is closed.
+ */
+ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>();
+
+ public HBaseUpsertSinkFunction(
+ String hTableName,
+ HBaseTableSchema schema,
+ org.apache.hadoop.conf.Configuration conf,
+ long bufferFlushMaxSizeInBytes,
+ long bufferFlushMaxMutations,
+ long bufferFlushIntervalMillis) {
+ checkArgument(schema.getRowKeyName().isPresent(), "HBaseUpsertSinkFunction requires rowkey is set.");
+ this.hTableName = hTableName;
+ this.schema = schema;
+ // Configuration is not serializable
+ this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
+ this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+ this.bufferFlushMaxMutations = bufferFlushMaxMutations;
+ this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ LOG.info("start open ...");
+ org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
+ try {
+ this.helper = new HBaseReadWriteHelper(schema);
+ this.numPendingRequests = new AtomicLong(0);
+
+ if (null == connection) {
+ this.connection = ConnectionFactory.createConnection(config);
+ }
+ // create a parameter instance, set the table name and custom listener reference.
+ BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(hTableName))
+ .listener(this)
+ .writeBufferSize(bufferFlushMaxSizeInBytes);
+ this.mutator = connection.getBufferedMutator(params);
+
+ if (bufferFlushIntervalMillis > 0) {
+ this.executor = Executors.newScheduledThreadPool(
+ 1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
+ this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {
+ if (closed) {
+ return;
+ }
+ try {
+ flush();
+ } catch (Exception e) {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, e);
+ }
+ }, bufferFlushIntervalMillis, bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
+ }
+ } catch (TableNotFoundException tnfe) {
+ LOG.error("The table " + hTableName + " not found ", tnfe);
+ throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
+ } catch (IOException ioe) {
+ LOG.error("Exception while creating connection to HBase.", ioe);
+ throw new RuntimeException("Cannot create connection to HBase.", ioe);
+ }
+ LOG.info("end open.");
+ }
+
+ private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
+ // create default configuration from current runtime env (`hbase-site.xml` in classpath) first,
+ // and overwrite configuration using serialized configuration from client-side env (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ org.apache.hadoop.conf.Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(serializedConfig, HBaseConfiguration.create());
+
+ // do validation: check key option(s) in final runtime configuration
+ if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
+ LOG.error("Can not connect to HBase without {} configuration", HConstants.ZOOKEEPER_QUORUM);
+ throw new IOException("Check HBase configuration failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ private void checkErrorAndRethrow() {
+ Throwable cause = failureThrowable.get();
+ if (cause != null) {
+ throw new RuntimeException("An error occurred in HBaseSink.", cause);
+ }
+ }
+
+ @Override
+ public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
+ checkErrorAndRethrow();
+
+ if (value.f0) {
+ Put put = helper.createPutMutation(value.f1);
+ mutator.mutate(put);
+ } else {
+ Delete delete = helper.createDeleteMutation(value.f1);
+ mutator.mutate(delete);
+ }
+
+ // flush when the buffer number of mutations greater than the configured max size.
+ if (bufferFlushMaxMutations > 0 && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
+ flush();
+ }
+ }
+
+ private void flush() throws IOException {
+ // BufferedMutator is thread-safe
+ mutator.flush();
+ numPendingRequests.set(0);
+ checkErrorAndRethrow();
+ }
+
+ @Override
+ public void close() throws Exception {
+ closed = true;
+
+ if (mutator != null) {
+ try {
+ mutator.close();
+ } catch (IOException e) {
+ LOG.warn("Exception occurs while closing HBase BufferedMutator.", e);
+ }
+ this.mutator = null;
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ LOG.warn("Exception occurs while closing HBase Connection.", e);
+ }
+ this.connection = null;
+ }
+
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(false);
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws Exception {
+ while (numPendingRequests.get() != 0) {
+ flush();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws Exception {
+ // nothing to do.
+ }
+
+ @Override
+ public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {
+ // fail the sink and skip the rest of the items
+ // if the failure handler decides to throw an exception
+ failureThrowable.compareAndSet(null, exception);
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
new file mode 100644
index 0000000..1dc7f97
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseUpsertTableSink.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.sinks.UpsertStreamTableSink;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An upsert {@link UpsertStreamTableSink} for HBase.
+ */
+public class HBaseUpsertTableSink implements UpsertStreamTableSink<Row> {
+
+ private final HBaseTableSchema hbaseTableSchema;
+ private final TableSchema tableSchema;
+ private final HBaseOptions hbaseOptions;
+ private final HBaseWriteOptions writeOptions;
+
+ public HBaseUpsertTableSink(
+ HBaseTableSchema hbaseTableSchema,
+ HBaseOptions hbaseOptions,
+ HBaseWriteOptions writeOptions) {
+ checkArgument(hbaseTableSchema.getRowKeyName().isPresent(), "HBaseUpsertTableSink requires rowkey is set.");
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.tableSchema = hbaseTableSchema.convertsToTableSchema();
+ this.hbaseOptions = hbaseOptions;
+ this.writeOptions = writeOptions;
+ }
+
+ @Override
+ public void setKeyFields(String[] keys) {
+ // hbase always upsert on rowkey, ignore query keys.
+ // Actually, we should verify the query key is the same with rowkey.
+ // However, the query key extraction doesn't work well in some scenarios
+ // (e.g. concat key fields will lose key information). So we skip key validation currently.
+ }
+
+ @Override
+ public void setIsAppendOnly(Boolean isAppendOnly) {
+ // hbase always upsert on rowkey, even works in append only mode.
+ }
+
+ @Override
+ public TypeInformation<Row> getRecordType() {
+ return tableSchema.toRowType();
+ }
+
+ @Override
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ Configuration hbaseClientConf = HBaseConfiguration.create();
+ hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, hbaseOptions.getZkQuorum());
+ hbaseOptions.getZkNodeParent().ifPresent(v -> hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+ HBaseUpsertSinkFunction sinkFunction = new HBaseUpsertSinkFunction(
+ hbaseOptions.getTableName(),
+ hbaseTableSchema,
+ hbaseClientConf,
+ writeOptions.getBufferFlushMaxSizeInBytes(),
+ writeOptions.getBufferFlushMaxRows(),
+ writeOptions.getBufferFlushIntervalMillis());
+ return dataStream
+ .addSink(sinkFunction)
+ .setParallelism(dataStream.getParallelism())
+ .name(TableConnectorUtils.generateRuntimeName(this.getClass(), tableSchema.getFieldNames()));
+ }
+
+ @Override
+ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
+ consumeDataStream(dataStream);
+ }
+
+ @Override
+ public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
+ if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
+ throw new ValidationException("Reconfiguration with different fields is not allowed. " +
+ "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
+ "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
+ }
+
+ return new HBaseUpsertTableSink(hbaseTableSchema, hbaseOptions, writeOptions);
+ }
+
+ @VisibleForTesting
+ HBaseTableSchema getHBaseTableSchema() {
+ return hbaseTableSchema;
+ }
+
+ @VisibleForTesting
+ HBaseOptions getHBaseOptions() {
+ return hbaseOptions;
+ }
+
+ @VisibleForTesting
+ HBaseWriteOptions getWriteOptions() {
+ return writeOptions;
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java
new file mode 100644
index 0000000..17f5d1d
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseWriteOptions.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.hadoop.hbase.client.ConnectionConfiguration;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Options for HBase writing.
+ */
+public class HBaseWriteOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final long bufferFlushMaxSizeInBytes;
+ private final long bufferFlushMaxRows;
+ private final long bufferFlushIntervalMillis;
+
+ private HBaseWriteOptions(
+ long bufferFlushMaxSizeInBytes,
+ long bufferFlushMaxMutations,
+ long bufferFlushIntervalMillis) {
+ this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+ this.bufferFlushMaxRows = bufferFlushMaxMutations;
+ this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+ }
+
+ long getBufferFlushMaxSizeInBytes() {
+ return bufferFlushMaxSizeInBytes;
+ }
+
+ long getBufferFlushMaxRows() {
+ return bufferFlushMaxRows;
+ }
+
+ long getBufferFlushIntervalMillis() {
+ return bufferFlushIntervalMillis;
+ }
+
+ @Override
+ public String toString() {
+ return "HBaseWriteOptions{" +
+ "bufferFlushMaxSizeInBytes=" + bufferFlushMaxSizeInBytes +
+ ", bufferFlushMaxRows=" + bufferFlushMaxRows +
+ ", bufferFlushIntervalMillis=" + bufferFlushIntervalMillis +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ HBaseWriteOptions that = (HBaseWriteOptions) o;
+ return bufferFlushMaxSizeInBytes == that.bufferFlushMaxSizeInBytes &&
+ bufferFlushMaxRows == that.bufferFlushMaxRows &&
+ bufferFlushIntervalMillis == that.bufferFlushIntervalMillis;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bufferFlushMaxSizeInBytes, bufferFlushMaxRows, bufferFlushIntervalMillis);
+ }
+
+ /**
+ * Creates a builder for {@link HBaseWriteOptions}.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link HBaseWriteOptions}.
+ */
+ public static class Builder {
+
+ // default is 2mb which is defined in hbase
+ private long bufferFlushMaxSizeInBytes = ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
+ private long bufferFlushMaxRows = -1;
+ private long bufferFlushIntervalMillis = -1;
+
+ /**
+ * Optional. Sets when to flush a buffered request based on the memory size of rows currently added.
+ * Default to <code>2mb</code>.
+ */
+ public Builder setBufferFlushMaxSizeInBytes(long bufferFlushMaxSizeInBytes) {
+ checkArgument(
+ bufferFlushMaxSizeInBytes > 0,
+ "Max byte size of buffered rows must be larger than 0.");
+ this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
+ return this;
+ }
+
+ /**
+ * Optional. Sets when to flush buffered request based on the number of rows currently added.
+ * Defaults to not set, i.e. won't flush based on the number of buffered rows.
+ */
+ public Builder setBufferFlushMaxRows(long bufferFlushMaxRows) {
+ checkArgument(
+ bufferFlushMaxRows > 0,
+ "Max number of buffered rows must be larger than 0.");
+ this.bufferFlushMaxRows = bufferFlushMaxRows;
+ return this;
+ }
+
+ /**
+ * Optional. Sets a flush interval flushing buffered requesting if the interval passes, in milliseconds.
+ * Defaults to not set, i.e. won't flush based on flush interval.
+ */
+ public Builder setBufferFlushIntervalMillis(long bufferFlushIntervalMillis) {
+ checkArgument(
+ bufferFlushIntervalMillis > 0,
+ "Interval (in milliseconds) between each flush must be larger than 0.");
+ this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
+ return this;
+ }
+
+ /**
+ * Creates a new instance of {@link HBaseWriteOptions}.
+ */
+ public HBaseWriteOptions build() {
+ return new HBaseWriteOptions(
+ bufferFlushMaxSizeInBytes,
+ bufferFlushMaxRows,
+ bufferFlushIntervalMillis);
+ }
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
similarity index 69%
rename from flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
rename to flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
index 1f8fac2..e4211cd 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadHelper.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/util/HBaseReadWriteHelper.java
@@ -22,18 +22,20 @@ import org.apache.flink.addons.hbase.HBaseTableSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import java.nio.charset.Charset;
/**
- * A read helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
- * for scanning or lookuping a HBase table, and supports converting the HBase {@link Result}
- * to Flink {@link Row}.
+ * A read and write helper for HBase. The helper can used to create a {@link Scan} and {@link Get}
+ * for scanning or lookuping a HBase table, and create a {@link Put} and {@link Delete} for writing
+ * to HBase table, and supports converting the HBase {@link Result} to Flink {@link Row}.
*/
-public class HBaseReadHelper {
+public class HBaseReadWriteHelper {
// family keys
private final byte[][] families;
@@ -55,7 +57,7 @@ public class HBaseReadHelper {
// nested family rows
private Row[] familyRows;
- public HBaseReadHelper(HBaseTableSchema hbaseTableSchema) {
+ public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema) {
this.families = hbaseTableSchema.getFamilyKeys();
this.qualifiers = new byte[this.families.length][][];
this.qualifierTypes = new int[this.families.length][];
@@ -163,4 +165,59 @@ public class HBaseReadHelper {
}
return resultRow;
}
+
+ /**
+ * Returns an instance of Put that writes record to HBase table.
+ *
+ * @return The appropriate instance of Put for this use case.
+ */
+ public Put createPutMutation(Row row) {
+ assert rowKeyIndex != -1;
+ byte[] rowkey = HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset);
+ // upsert
+ Put put = new Put(rowkey);
+ for (int i = 0; i < fieldLength; i++) {
+ if (i != rowKeyIndex) {
+ int f = i > rowKeyIndex ? i - 1 : i;
+ // get family key
+ byte[] familyKey = families[f];
+ Row familyRow = (Row) row.getField(i);
+ for (int q = 0; q < this.qualifiers[f].length; q++) {
+ // get quantifier key
+ byte[] qualifier = qualifiers[f][q];
+ // get quantifier type idx
+ int typeIdx = qualifierTypes[f][q];
+ // read value
+ byte[] value = HBaseTypeUtils.serializeFromObject(familyRow.getField(q), typeIdx, charset);
+ put.addColumn(familyKey, qualifier, value);
+ }
+ }
+ }
+ return put;
+ }
+
+ /**
+ * Returns an instance of Delete that remove record from HBase table.
+ *
+ * @return The appropriate instance of Delete for this use case.
+ */
+ public Delete createDeleteMutation(Row row) {
+ assert rowKeyIndex != -1;
+ byte[] rowkey = HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset);
+ // delete
+ Delete delete = new Delete(rowkey);
+ for (int i = 0; i < fieldLength; i++) {
+ if (i != rowKeyIndex) {
+ int f = i > rowKeyIndex ? i - 1 : i;
+ // get family key
+ byte[] familyKey = families[f];
+ for (int q = 0; q < this.qualifiers[f].length; q++) {
+ // get quantifier key
+ byte[] qualifier = qualifiers[f][q];
+ delete.addColumn(familyKey, qualifier);
+ }
+ }
+ }
+ return delete;
+ }
}
diff --git a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
index fd844e9..e08909c 100644
--- a/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
+++ b/flink-connectors/flink-hbase/src/main/java/org/apache/flink/table/descriptors/HBaseValidator.java
@@ -32,18 +32,30 @@ public class HBaseValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "1.4.3";
- public static final String CONNECTOR_HBASE_TABLE_NAME = "connector.table-name";
- public static final String CONNECTOR_HBASE_ZK_QUORUM = "connector.zookeeper.quorum";
+ public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
+ public static final String CONNECTOR_ZK_QUORUM = "connector.zookeeper.quorum";
+ public static final String CONNECTOR_ZK_NODE_PARENT = "connector.zookeeper.znode.parent";
+ public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = "connector.write.buffer-flush.max-size";
+ public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = "connector.write.buffer-flush.max-rows";
+ public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = "connector.write.buffer-flush.interval";
@Override
public void validate(DescriptorProperties properties) {
super.validate(properties);
properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE, false);
- properties.validateString(CONNECTOR_HBASE_TABLE_NAME, false, 1);
- properties.validateString(CONNECTOR_HBASE_ZK_QUORUM, false, 1);
+ properties.validateString(CONNECTOR_TABLE_NAME, false, 1);
+ properties.validateString(CONNECTOR_ZK_QUORUM, false, 1);
+ properties.validateString(CONNECTOR_ZK_NODE_PARENT, true, 1);
+ validateSinkProperties(properties);
validateVersion(properties);
}
+ private void validateSinkProperties(DescriptorProperties properties) {
+ properties.validateMemorySize(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE, true, 1024 * 1024); // only allow MB precision
+ properties.validateInt(CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS, true, 1);
+ properties.validateDuration(CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL, true, 1);
+ }
+
private void validateVersion(DescriptorProperties properties) {
final List<String> versions = Arrays.asList(CONNECTOR_VERSION_VALUE_143);
properties.validateEnumValues(CONNECTOR_VERSION, false, versions);
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
index a0822a3..731cf31 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseLookupFunctionITCase.java
@@ -51,10 +51,10 @@ import java.util.Map;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TABLE_NAME;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_TYPE_VALUE_HBASE;
import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
+import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_ZK_QUORUM;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
/**
@@ -145,10 +145,10 @@ public class HBaseLookupFunctionITCase extends HBaseTestingClusterAutostarter {
properties.put(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_HBASE);
properties.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
properties.put(CONNECTOR_PROPERTY_VERSION, "1");
- properties.put(CONNECTOR_HBASE_TABLE_NAME, HTABLE_NAME);
+ properties.put(CONNECTOR_TABLE_NAME, HTABLE_NAME);
// get zk quorum from "hbase-site.xml" in classpath
String hbaseZk = HBaseConfiguration.create().get(HConstants.ZOOKEEPER_QUORUM);
- properties.put(CONNECTOR_HBASE_ZK_QUORUM, hbaseZk);
+ properties.put(CONNECTOR_ZK_QUORUM, hbaseZk);
// schema
String[] columnNames = {FAMILY1, ROWKEY, FAMILY2, FAMILY3};
TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{F1COL1}, Types.INT);
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
new file mode 100644
index 0000000..ef1445d
--- /dev/null
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseSinkITCase.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * IT case Test for {@link HBaseUpsertTableSink}.
+ */
+public class HBaseSinkITCase extends HBaseTestingClusterAutostarter {
+ private static final long serialVersionUID = 1L;
+
+ private static final String TEST_TABLE = "testTable";
+
+ private static final String FAMILY1 = "family1";
+ private static final String F1COL1 = "col1";
+
+ private static final String FAMILY2 = "family2";
+ private static final String F2COL1 = "col1";
+ private static final String F2COL2 = "col2";
+
+ private static final String FAMILY3 = "family3";
+ private static final String F3COL1 = "col1";
+ private static final String F3COL2 = "col2";
+ private static final String F3COL3 = "col3";
+
+ // prepare a source collection.
+ private static final List<Row> testData1 = new ArrayList<>();
+ private static final RowTypeInfo testTypeInfo1 = new RowTypeInfo(
+ new TypeInformation[]{Types.INT, Types.INT, Types.STRING, Types.LONG, Types.DOUBLE, Types.BOOLEAN, Types.STRING},
+ new String[]{"rowkey", "f1c1", "f2c1", "f2c2", "f3c1", "f3c2", "f3c3"});
+
+ static {
+ testData1.add(Row.of(1, 10, "Hello-1", 100L, 1.01, false, "Welt-1"));
+ testData1.add(Row.of(2, 20, "Hello-2", 200L, 2.02, true, "Welt-2"));
+ testData1.add(Row.of(3, 30, "Hello-3", 300L, 3.03, false, "Welt-3"));
+ testData1.add(Row.of(4, 40, null, 400L, 4.04, true, "Welt-4"));
+ testData1.add(Row.of(5, 50, "Hello-5", 500L, 5.05, false, "Welt-5"));
+ testData1.add(Row.of(6, 60, "Hello-6", 600L, 6.06, true, "Welt-6"));
+ testData1.add(Row.of(7, 70, "Hello-7", 700L, 7.07, false, "Welt-7"));
+ testData1.add(Row.of(8, 80, null, 800L, 8.08, true, "Welt-8"));
+ }
+
+ @BeforeClass
+ public static void activateHBaseCluster() throws IOException {
+ registerHBaseMiniClusterInClasspath();
+ createHBaseTable();
+ }
+
+ private static void createHBaseTable() {
+ // create a table
+ TableName tableName = TableName.valueOf(TEST_TABLE);
+ // column families
+ byte[][] families = new byte[][]{Bytes.toBytes(FAMILY1), Bytes.toBytes(FAMILY2), Bytes.toBytes(FAMILY3)};
+ // split keys
+ byte[][] splitKeys = new byte[][]{Bytes.toBytes(4)};
+ createTable(tableName, families, splitKeys);
+ }
+
+ @Test
+ public void testTableSink() throws Exception {
+ HBaseTableSchema schema = new HBaseTableSchema();
+ schema.addColumn(FAMILY1, F1COL1, Integer.class);
+ schema.addColumn(FAMILY2, F2COL1, String.class);
+ schema.addColumn(FAMILY2, F2COL2, Long.class);
+ schema.setRowKey("rk", Integer.class);
+ schema.addColumn(FAMILY3, F3COL1, Double.class);
+ schema.addColumn(FAMILY3, F3COL2, Boolean.class);
+ schema.addColumn(FAMILY3, F3COL3, String.class);
+
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("connector.type", "hbase");
+ tableProperties.put("connector.version", "1.4.3");
+ tableProperties.put("connector.property-version", "1");
+ tableProperties.put("connector.table-name", TEST_TABLE);
+ tableProperties.put("connector.zookeeper.quorum", getZookeeperQuorum());
+ tableProperties.put("connector.zookeeper.znode.parent", "/hbase");
+ DescriptorProperties descriptorProperties = new DescriptorProperties(true);
+ descriptorProperties.putTableSchema(SCHEMA, schema.convertsToTableSchema());
+ descriptorProperties.putProperties(tableProperties);
+ TableSink tableSink = TableFactoryService
+ .find(HBaseTableFactory.class, descriptorProperties.asMap())
+ .createTableSink(descriptorProperties.asMap());
+
+ StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(4);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(execEnv);
+ StreamITCase.clear();
+
+ DataStream<Row> ds = execEnv.fromCollection(testData1).returns(testTypeInfo1);
+ tEnv.registerDataStream("src", ds);
+ tEnv.registerTableSink("hbase", tableSink);
+
+ String query = "INSERT INTO hbase SELECT ROW(f1c1), ROW(f2c1, f2c2), rowkey, ROW(f3c1, f3c2, f3c3) FROM src";
+ tEnv.sqlUpdate(query);
+
+ // wait to finish
+ tEnv.execute("HBase Job");
+
+ // start a batch scan job to verify contents in HBase table
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
+ BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env, new TableConfig());
+
+ HBaseTableSource hbaseTable = new HBaseTableSource(getConf(), TEST_TABLE);
+ hbaseTable.setRowKey("rowkey", Integer.class);
+ hbaseTable.addColumn(FAMILY1, F1COL1, Integer.class);
+ hbaseTable.addColumn(FAMILY2, F2COL1, String.class);
+ hbaseTable.addColumn(FAMILY2, F2COL2, Long.class);
+ hbaseTable.addColumn(FAMILY3, F3COL1, Double.class);
+ hbaseTable.addColumn(FAMILY3, F3COL2, Boolean.class);
+ hbaseTable.addColumn(FAMILY3, F3COL3, String.class);
+ tableEnv.registerTableSource("hTable", hbaseTable);
+
+ Table result = tableEnv.sqlQuery(
+ "SELECT " +
+ " h.rowkey, " +
+ " h.family1.col1, " +
+ " h.family2.col1, " +
+ " h.family2.col2, " +
+ " h.family3.col1, " +
+ " h.family3.col2, " +
+ " h.family3.col3 " +
+ "FROM hTable AS h"
+ );
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected =
+ "1,10,Hello-1,100,1.01,false,Welt-1\n" +
+ "2,20,Hello-2,200,2.02,true,Welt-2\n" +
+ "3,30,Hello-3,300,3.03,false,Welt-3\n" +
+ "4,40,,400,4.04,true,Welt-4\n" +
+ "5,50,Hello-5,500,5.05,false,Welt-5\n" +
+ "6,60,Hello-6,600,6.06,true,Welt-6\n" +
+ "7,70,Hello-7,700,7.07,false,Welt-7\n" +
+ "8,80,,800,8.08,true,Welt-8\n";
+
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
index 4061c0e..eba0cee 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTableFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
@@ -36,12 +37,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_TABLE_NAME;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_HBASE_ZK_QUORUM;
-import static org.apache.flink.table.descriptors.HBaseValidator.CONNECTOR_VERSION_VALUE_143;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
/**
@@ -59,21 +54,25 @@ public class HBaseTableFactoryTest {
private DescriptorProperties createDescriptor(String[] columnNames, TypeInformation[] columnTypes) {
TableSchema tableSchema = new TableSchema(columnNames, columnTypes);
- Map<String, String> tableServiceLookupConf = new HashMap<>();
- tableServiceLookupConf.put(CONNECTOR_TYPE, "hbase");
- tableServiceLookupConf.put(CONNECTOR_VERSION, CONNECTOR_VERSION_VALUE_143);
- tableServiceLookupConf.put(CONNECTOR_PROPERTY_VERSION, "1");
- tableServiceLookupConf.put(CONNECTOR_HBASE_TABLE_NAME, "testHBastTable");
- tableServiceLookupConf.put(CONNECTOR_HBASE_ZK_QUORUM, "localhost:2181");
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put("connector.type", "hbase");
+ tableProperties.put("connector.version", "1.4.3");
+ tableProperties.put("connector.property-version", "1");
+ tableProperties.put("connector.table-name", "testHBastTable");
+ tableProperties.put("connector.zookeeper.quorum", "localhost:2181");
+ tableProperties.put("connector.zookeeper.znode.parent", "/flink");
+ tableProperties.put("connector.write.buffer-flush.max-size", "10mb");
+ tableProperties.put("connector.write.buffer-flush.max-rows", "1000");
+ tableProperties.put("connector.write.buffer-flush.interval", "10s");
DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putTableSchema(SCHEMA, tableSchema);
- descriptorProperties.putProperties(tableServiceLookupConf);
+ descriptorProperties.putProperties(tableProperties);
return descriptorProperties;
}
@Test
- public void testConstructorForNestedSchema() {
+ public void testTableSourceFactory() {
String[] columnNames = {FAMILY1, FAMILY2, ROWKEY, FAMILY3};
TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1}, Types.INT);
@@ -102,4 +101,49 @@ public class HBaseTableFactoryTest {
Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
}
+
+ @Test
+ public void testTableSinkFactory() {
+ String[] columnNames = {ROWKEY, FAMILY1, FAMILY2, FAMILY3};
+ TypeInformation<Row> f1 = Types.ROW_NAMED(new String[]{COL1, COL2}, Types.DOUBLE, Types.INT);
+ TypeInformation<Row> f2 = Types.ROW_NAMED(new String[]{COL1, COL3}, Types.INT, Types.LONG);
+ TypeInformation<Row> f3 = Types.ROW_NAMED(new String[]{COL2, COL3}, Types.BOOLEAN, Types.STRING);
+ TypeInformation[] columnTypes = new TypeInformation[]{Types.STRING, f1, f2, f3};
+ DescriptorProperties descriptorProperties = createDescriptor(columnNames, columnTypes);
+
+ TableSink sink = TableFactoryService
+ .find(HBaseTableFactory.class, descriptorProperties.asMap())
+ .createTableSink(descriptorProperties.asMap());
+
+ Assert.assertTrue(sink instanceof HBaseUpsertTableSink);
+
+ HBaseTableSchema hbaseSchema = ((HBaseUpsertTableSink) sink).getHBaseTableSchema();
+ Assert.assertEquals(0, hbaseSchema.getRowKeyIndex());
+ Assert.assertEquals(Optional.of(Types.STRING), hbaseSchema.getRowKeyTypeInfo());
+
+ Assert.assertArrayEquals(new String[]{"f1", "f2", "f3"}, hbaseSchema.getFamilyNames());
+ Assert.assertArrayEquals(new String[]{"c1", "c2"}, hbaseSchema.getQualifierNames("f1"));
+ Assert.assertArrayEquals(new String[]{"c1", "c3"}, hbaseSchema.getQualifierNames("f2"));
+ Assert.assertArrayEquals(new String[]{"c2", "c3"}, hbaseSchema.getQualifierNames("f3"));
+
+ Assert.assertArrayEquals(new TypeInformation[]{Types.DOUBLE, Types.INT}, hbaseSchema.getQualifierTypes("f1"));
+ Assert.assertArrayEquals(new TypeInformation[]{Types.INT, Types.LONG}, hbaseSchema.getQualifierTypes("f2"));
+ Assert.assertArrayEquals(new TypeInformation[]{Types.BOOLEAN, Types.STRING}, hbaseSchema.getQualifierTypes("f3"));
+
+ HBaseOptions expectedHBaseOptions = HBaseOptions.builder()
+ .setTableName("testHBastTable")
+ .setZkQuorum("localhost:2181")
+ .setZkNodeParent("/flink")
+ .build();
+ HBaseOptions actualHBaseOptions = ((HBaseUpsertTableSink) sink).getHBaseOptions();
+ Assert.assertEquals(expectedHBaseOptions, actualHBaseOptions);
+
+ HBaseWriteOptions expectedWriteOptions = HBaseWriteOptions.builder()
+ .setBufferFlushMaxRows(1000)
+ .setBufferFlushIntervalMillis(10 * 1000)
+ .setBufferFlushMaxSizeInBytes(10 * 1024 * 1024)
+ .build();
+ HBaseWriteOptions actualWriteOptions = ((HBaseUpsertTableSink) sink).getWriteOptions();
+ Assert.assertEquals(expectedWriteOptions, actualWriteOptions);
+ }
}
diff --git a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
index 2e61a28..905f80d 100644
--- a/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
+++ b/flink-connectors/flink-hbase/src/test/java/org/apache/flink/addons/hbase/HBaseTestingClusterAutostarter.java
@@ -157,6 +157,13 @@ public class HBaseTestingClusterAutostarter extends TestLogger implements Serial
LOG.info("HBase minicluster: Running");
}
+ /**
+ * Returns zookeeper quorum value contains the right port number (varies per run).
+ */
+ static String getZookeeperQuorum() {
+ return "localhost:" + TEST_UTIL.getZkCluster().getClientPort();
+ }
+
private static File hbaseSiteXmlDirectory;
private static File hbaseSiteXmlFile;