You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/06 12:13:05 UTC
[flink] branch master updated: [FLINK-17255][python] Support HBase
connector descriptor in PyFlink. (#11955)
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a8cee7b [FLINK-17255][python] Support HBase connector descriptor in PyFlink. (#11955)
a8cee7b is described below
commit a8cee7b8362ff0c44102fa976f0bd3e17ece22dd
Author: shuiqiangchen <44...@users.noreply.github.com>
AuthorDate: Wed May 6 20:12:32 2020 +0800
[FLINK-17255][python] Support HBase connector descriptor in PyFlink. (#11955)
---
docs/dev/table/connect.md | 21 +++++
flink-python/bin/pyflink-gateway-server.sh | 1 +
flink-python/pyflink/table/descriptors.py | 96 +++++++++++++++++++++
.../pyflink/table/tests/test_descriptor.py | 99 +++++++++++++++++++++-
4 files changed, 216 insertions(+), 1 deletion(-)
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 6d3d7d9..8ffa911 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1199,6 +1199,27 @@ CREATE TABLE MyUserTable (
)
{% endhighlight %}
</div>
+<div data-lang="python" markdown="1">
+{% highlight python%}
+.connect(
+ HBase()
+ .version('1.4.3') # required: currently only support '1.4.3'
+ .table_name('hbase_table_name') # required: HBase table name
+ .zookeeper_quorum('localhost:2181') # required: HBase Zookeeper quorum configuration
+ .zookeeper_node_parent('/test') # optional: the root dir in Zookeeper for Hbae cluster.
+ # The default value is '/hbase'
+ .write_buffer_flush_max_size('10mb') # optional: writing option, determines how many size in memory of buffered
+ # rows to insert per round trip. This can help performance on writing to JDBC
+ # database. The default value is '2mb'
+ .write_buffer_flush_max_rows(1000) # optional: writing option, determines how many rows to insert per round trip.
+ # This can help performance on writing to JDBC database. No default value,
+ # i.e. the default flushing is not depends on the number of buffered rows.
+ .write_buffer_flush_interval('2s') # optional: writing option, sets a flush interval flushing buffered requesting
+ # if the interval passes, in milliseconds. Default value is '0s', which means
+ # no asynchronous flush thread will he scheduled.
+)
+{% endhighlight%}
+</div>
<div data-lang="YAML" markdown="1">
{% highlight yaml %}
connector:
diff --git a/flink-python/bin/pyflink-gateway-server.sh b/flink-python/bin/pyflink-gateway-server.sh
index b6268a0..d5bcd92 100644
--- a/flink-python/bin/pyflink-gateway-server.sh
+++ b/flink-python/bin/pyflink-gateway-server.sh
@@ -80,6 +80,7 @@ if [[ -n "$FLINK_TESTING" ]]; then
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar"
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-api/target/flink-ml-api*.jar"
FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-lib/target/flink-ml-lib*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-hbase/target/flink*.jar"
# disable the wildcard expansion for the moment.
set -f
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index d4eb8b3..ffd4fe4 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -29,6 +29,7 @@ __all__ = [
'FileSystem',
'Kafka',
'Elasticsearch',
+ 'HBase',
'Csv',
'Avro',
'Json',
@@ -1196,6 +1197,101 @@ class Elasticsearch(ConnectorDescriptor):
return self
+class HBase(ConnectorDescriptor):
+ """
+ Connector descriptor for Apache HBase.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_hbase = gateway.jvm.HBase()
+ super(HBase, self).__init__(self._j_hbase)
+
+ def version(self, version):
+ """
+ Set the Apache HBase version to be used, Required.
+
+ :param version: HBase version. E.g., "1.4.3".
+ :return: This object.
+ """
+ if not isinstance(version, str):
+ version = str(version)
+ self._j_hbase = self._j_hbase.version(version)
+ return self
+
+ def table_name(self, table_name):
+ """
+ Set the HBase table name, Required.
+
+ :param table_name: Name of HBase table. E.g., "testNamespace:testTable", "testDefaultTable"
+ :return: This object.
+ """
+ self._j_hbase = self._j_hbase.tableName(table_name)
+ return self
+
+ def zookeeper_quorum(self, zookeeper_quorum):
+ """
+ Set the zookeeper quorum address to connect the HBase cluster, Required.
+
+ :param zookeeper_quorum: zookeeper quorum address to connect the HBase cluster. E.g.,
+ "localhost:2181,localhost:2182,localhost:2183"
+ :return: This object.
+ """
+ self._j_hbase = self._j_hbase.zookeeperQuorum(zookeeper_quorum)
+ return self
+
+ def zookeeper_node_parent(self, zookeeper_node_parent):
+ """
+ Set the zookeeper node parent path of HBase cluster. Default to use "/hbase", Optional.
+
+ :param zookeeper_node_parent: zookeeper node path of hbase cluster. E.g,
+ "/hbase/example-root-znode".
+ :return: This object
+ """
+ self._j_hbase = self._j_hbase.zookeeperNodeParent(zookeeper_node_parent)
+ return self
+
+ def write_buffer_flush_max_size(self, max_size):
+ """
+ Set threshold when to flush buffered request based on the memory byte size of rows currently
+ added.
+
+ :param max_size: the maximum size.
+ :return: This object.
+ """
+ if not isinstance(max_size, str):
+ max_size = str(max_size)
+ self._j_hbase = self._j_hbase.writeBufferFlushMaxSize(max_size)
+ return self
+
+ def write_buffer_flush_max_rows(self, write_buffer_flush_max_rows):
+ """
+ Set threshold when to flush buffered request based on the number of rows currently added.
+ Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.
+
+ :param write_buffer_flush_max_rows: number of added rows when begin the request flushing.
+ :return: This object.
+ """
+ self._j_hbase = self._j_hbase.writeBufferFlushMaxRows(write_buffer_flush_max_rows)
+ return self
+
+ def write_buffer_flush_interval(self, interval):
+ """
+ Set an interval when to flushing buffered requesting if the interval passes, in
+ milliseconds.
+ Defaults to not set, i.e. won't flush based on flush interval, Optional.
+
+ :param interval: flush interval. The string should be in format
+ "{length value}{time unit label}" E.g, "123ms", "1 s", if not time unit
+ label is specified, it will be considered as milliseconds.
+ :return: This object.
+ """
+ if not isinstance(interval, str):
+ interval = str(interval)
+ self._j_hbase = self._j_hbase.writeBufferFlushInterval(interval)
+ return self
+
+
class CustomConnectorDescriptor(ConnectorDescriptor):
"""
Describes a custom connector to an other system.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 6894e10..06ed9dd 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -19,7 +19,7 @@ import os
from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor,
- CustomFormatDescriptor)
+ CustomFormatDescriptor, HBase)
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -360,6 +360,103 @@ class CustomConnectorDescriptorTests(PyFlinkTestCase):
self.assertEqual(expected, properties)
+class HBaseDescriptorTests(PyFlinkTestCase):
+
+ def test_version(self):
+ hbase = HBase().version("1.4.3")
+
+ properties = hbase.to_properties()
+ expected = {'connector.version': '1.4.3',
+ 'connector.type': 'hbase',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ hbase = HBase().version(1.1)
+ properties = hbase.to_properties()
+ expected = {'connector.version': '1.1',
+ 'connector.type': 'hbase',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_table_name(self):
+ hbase = HBase().table_name('tableName1')
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.table-name': 'tableName1',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_zookeeper_quorum(self):
+ hbase = HBase().zookeeper_quorum("localhost:2181,localhost:2182")
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.zookeeper.quorum': 'localhost:2181,localhost:2182',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_zookeeper_node_parent(self):
+ hbase = HBase().zookeeper_node_parent('/hbase/example-root-znode')
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.zookeeper.znode.parent': '/hbase/example-root-znode',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_write_buffer_flush_max_size(self):
+ hbase = HBase().write_buffer_flush_max_size('1000')
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.write.buffer-flush.max-size': '1000 bytes',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ hbase = HBase().write_buffer_flush_max_size(1000)
+ properties = hbase.to_properties()
+ self.assertEqual(expected, properties)
+
+ hbase = HBase().write_buffer_flush_max_size('10mb')
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.write.buffer-flush.max-size': '10 mb',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_write_buffer_flush_max_rows(self):
+ hbase = HBase().write_buffer_flush_max_rows(10)
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.write.buffer-flush.max-rows': '10',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ def test_write_buffer_flush_interval(self):
+ hbase = HBase().write_buffer_flush_interval('123')
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.write.buffer-flush.interval': '123',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+ hbase = HBase().write_buffer_flush_interval(123)
+
+ properties = hbase.to_properties()
+ self.assertEqual(expected, properties)
+
+ hbase = HBase().write_buffer_flush_interval('123ms')
+
+ properties = hbase.to_properties()
+ expected = {'connector.type': 'hbase',
+ 'connector.write.buffer-flush.interval': '123ms',
+ 'connector.property-version': '1'}
+ self.assertEqual(expected, properties)
+
+
class OldCsvDescriptorTests(PyFlinkTestCase):
def test_field_delimiter(self):