You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/05/06 12:13:05 UTC

[flink] branch master updated: [FLINK-17255][python] Support HBase connector descriptor in PyFlink. (#11955)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8cee7b  [FLINK-17255][python] Support HBase connector descriptor in PyFlink. (#11955)
a8cee7b is described below

commit a8cee7b8362ff0c44102fa976f0bd3e17ece22dd
Author: shuiqiangchen <44...@users.noreply.github.com>
AuthorDate: Wed May 6 20:12:32 2020 +0800

    [FLINK-17255][python] Support HBase connector descriptor in PyFlink. (#11955)
---
 docs/dev/table/connect.md                          | 21 +++++
 flink-python/bin/pyflink-gateway-server.sh         |  1 +
 flink-python/pyflink/table/descriptors.py          | 96 +++++++++++++++++++++
 .../pyflink/table/tests/test_descriptor.py         | 99 +++++++++++++++++++++-
 4 files changed, 216 insertions(+), 1 deletion(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 6d3d7d9..8ffa911 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1199,6 +1199,27 @@ CREATE TABLE MyUserTable (
 )
 {% endhighlight %}
 </div>
+<div data-lang="python" markdown="1">
+{% highlight python%}
+.connect(
+    HBase()
+    .version('1.4.3')                      # required: currently only support '1.4.3'
+    .table_name('hbase_table_name')        # required: HBase table name
+    .zookeeper_quorum('localhost:2181')    # required: HBase Zookeeper quorum configuration
+    .zookeeper_node_parent('/test')        # optional: the root dir in Zookeeper for Hbae cluster.
+                                           # The default value is '/hbase'
+    .write_buffer_flush_max_size('10mb')   # optional: writing option, determines how many size in memory of buffered
+                                           # rows to insert per round trip. This can help performance on writing to JDBC
+                                           # database. The default value is '2mb'
+    .write_buffer_flush_max_rows(1000)     # optional: writing option, determines how many rows to insert per round trip.
+                                           # This can help performance on writing to JDBC database. No default value,
+                                           # i.e. the default flushing is not depends on the number of buffered rows.
+    .write_buffer_flush_interval('2s')     # optional: writing option, sets a flush interval flushing buffered requesting
+                                           # if the interval passes, in milliseconds. Default value is '0s', which means
+                                           # no asynchronous flush thread will he scheduled.
+)
+{% endhighlight%}
+</div>
 <div data-lang="YAML" markdown="1">
 {% highlight yaml %}
 connector:
diff --git a/flink-python/bin/pyflink-gateway-server.sh b/flink-python/bin/pyflink-gateway-server.sh
index b6268a0..d5bcd92 100644
--- a/flink-python/bin/pyflink-gateway-server.sh
+++ b/flink-python/bin/pyflink-gateway-server.sh
@@ -80,6 +80,7 @@ if [[ -n "$FLINK_TESTING" ]]; then
   FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar"
   FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-api/target/flink-ml-api*.jar"
   FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-ml-parent/flink-ml-lib/target/flink-ml-lib*.jar"
+  FIND_EXPRESSION="$FIND_EXPRESSION -o -path ${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-hbase/target/flink*.jar"
 
   # disable the wildcard expansion for the moment.
   set -f
diff --git a/flink-python/pyflink/table/descriptors.py b/flink-python/pyflink/table/descriptors.py
index d4eb8b3..ffd4fe4 100644
--- a/flink-python/pyflink/table/descriptors.py
+++ b/flink-python/pyflink/table/descriptors.py
@@ -29,6 +29,7 @@ __all__ = [
     'FileSystem',
     'Kafka',
     'Elasticsearch',
+    'HBase',
     'Csv',
     'Avro',
     'Json',
@@ -1196,6 +1197,101 @@ class Elasticsearch(ConnectorDescriptor):
         return self
 
 
+class HBase(ConnectorDescriptor):
+    """
+    Connector descriptor for Apache HBase.
+    """
+
+    def __init__(self):
+        gateway = get_gateway()
+        self._j_hbase = gateway.jvm.HBase()
+        super(HBase, self).__init__(self._j_hbase)
+
+    def version(self, version):
+        """
+        Set the Apache HBase version to be used, Required.
+
+        :param version: HBase version. E.g., "1.4.3".
+        :return: This object.
+        """
+        if not isinstance(version, str):
+            version = str(version)
+        self._j_hbase = self._j_hbase.version(version)
+        return self
+
+    def table_name(self, table_name):
+        """
+        Set the HBase table name, Required.
+
+        :param table_name: Name of HBase table. E.g., "testNamespace:testTable", "testDefaultTable"
+        :return: This object.
+        """
+        self._j_hbase = self._j_hbase.tableName(table_name)
+        return self
+
+    def zookeeper_quorum(self, zookeeper_quorum):
+        """
+        Set the zookeeper quorum address to connect the HBase cluster, Required.
+
+        :param zookeeper_quorum: zookeeper quorum address to connect the HBase cluster. E.g.,
+                                 "localhost:2181,localhost:2182,localhost:2183"
+        :return: This object.
+        """
+        self._j_hbase = self._j_hbase.zookeeperQuorum(zookeeper_quorum)
+        return self
+
+    def zookeeper_node_parent(self, zookeeper_node_parent):
+        """
+        Set the zookeeper node parent path of HBase cluster. Default to use "/hbase", Optional.
+
+        :param zookeeper_node_parent: zookeeper node path of hbase cluster. E.g,
+                                      "/hbase/example-root-znode".
+        :return: This object
+        """
+        self._j_hbase = self._j_hbase.zookeeperNodeParent(zookeeper_node_parent)
+        return self
+
+    def write_buffer_flush_max_size(self, max_size):
+        """
+        Set threshold when to flush buffered request based on the memory byte size of rows currently
+        added.
+
+        :param max_size: the maximum size.
+        :return: This object.
+        """
+        if not isinstance(max_size, str):
+            max_size = str(max_size)
+        self._j_hbase = self._j_hbase.writeBufferFlushMaxSize(max_size)
+        return self
+
+    def write_buffer_flush_max_rows(self, write_buffer_flush_max_rows):
+        """
+        Set threshold when to flush buffered request based on the number of rows currently added.
+        Defaults to not set, i.e. won;t flush based on the number of buffered rows, Optional.
+
+        :param write_buffer_flush_max_rows: number of added rows when begin the request flushing.
+        :return: This object.
+        """
+        self._j_hbase = self._j_hbase.writeBufferFlushMaxRows(write_buffer_flush_max_rows)
+        return self
+
+    def write_buffer_flush_interval(self, interval):
+        """
+        Set an interval when to flushing buffered requesting if the interval passes, in
+        milliseconds.
+        Defaults to not set, i.e. won't flush based on flush interval, Optional.
+
+        :param interval: flush interval. The string should be in format
+                         "{length value}{time unit label}" E.g, "123ms", "1 s", if not time unit
+                         label is specified, it will be considered as milliseconds.
+        :return: This object.
+        """
+        if not isinstance(interval, str):
+            interval = str(interval)
+        self._j_hbase = self._j_hbase.writeBufferFlushInterval(interval)
+        return self
+
+
 class CustomConnectorDescriptor(ConnectorDescriptor):
     """
     Describes a custom connector to an other system.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py b/flink-python/pyflink/table/tests/test_descriptor.py
index 6894e10..06ed9dd 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -19,7 +19,7 @@ import os
 
 from pyflink.table.descriptors import (FileSystem, OldCsv, Rowtime, Schema, Kafka,
                                        Elasticsearch, Csv, Avro, Json, CustomConnectorDescriptor,
-                                       CustomFormatDescriptor)
+                                       CustomFormatDescriptor, HBase)
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes
 from pyflink.testing.test_case_utils import (PyFlinkTestCase, PyFlinkStreamTableTestCase,
@@ -360,6 +360,103 @@ class CustomConnectorDescriptorTests(PyFlinkTestCase):
         self.assertEqual(expected, properties)
 
 
+class HBaseDescriptorTests(PyFlinkTestCase):
+
+    def test_version(self):
+        hbase = HBase().version("1.4.3")
+
+        properties = hbase.to_properties()
+        expected = {'connector.version': '1.4.3',
+                    'connector.type': 'hbase',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+        hbase = HBase().version(1.1)
+        properties = hbase.to_properties()
+        expected = {'connector.version': '1.1',
+                    'connector.type': 'hbase',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_table_name(self):
+        hbase = HBase().table_name('tableName1')
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.table-name': 'tableName1',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_zookeeper_quorum(self):
+        hbase = HBase().zookeeper_quorum("localhost:2181,localhost:2182")
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.zookeeper.quorum': 'localhost:2181,localhost:2182',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_zookeeper_node_parent(self):
+        hbase = HBase().zookeeper_node_parent('/hbase/example-root-znode')
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.zookeeper.znode.parent': '/hbase/example-root-znode',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_write_buffer_flush_max_size(self):
+        hbase = HBase().write_buffer_flush_max_size('1000')
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.write.buffer-flush.max-size': '1000 bytes',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+        hbase = HBase().write_buffer_flush_max_size(1000)
+        properties = hbase.to_properties()
+        self.assertEqual(expected, properties)
+
+        hbase = HBase().write_buffer_flush_max_size('10mb')
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.write.buffer-flush.max-size': '10 mb',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_write_buffer_flush_max_rows(self):
+        hbase = HBase().write_buffer_flush_max_rows(10)
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.write.buffer-flush.max-rows': '10',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+    def test_write_buffer_flush_interval(self):
+        hbase = HBase().write_buffer_flush_interval('123')
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.write.buffer-flush.interval': '123',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+        hbase = HBase().write_buffer_flush_interval(123)
+
+        properties = hbase.to_properties()
+        self.assertEqual(expected, properties)
+
+        hbase = HBase().write_buffer_flush_interval('123ms')
+
+        properties = hbase.to_properties()
+        expected = {'connector.type': 'hbase',
+                    'connector.write.buffer-flush.interval': '123ms',
+                    'connector.property-version': '1'}
+        self.assertEqual(expected, properties)
+
+
 class OldCsvDescriptorTests(PyFlinkTestCase):
 
     def test_field_delimiter(self):