You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/03 10:15:18 UTC

[flink] branch release-1.11 updated (639a892 -> 5c1579f)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 639a892  [hotfix][python] Add the version for the APIs introduced in 1.11.0
     new fe7734d  [hotfix][hbase] Rename HBase connector option 'zookeeper.znode-parent' to 'zookeeper.znode.parent'
     new 2c977bb  [FLINK-17995][docs][table] Redesign Table & SQL Connectors pages
     new 5c1579f  [FLINK-17830][docs][hbase] Add documentation for the new HBase SQL connector

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/connect.md                          |   4 +-
 docs/dev/table/connect.zh.md                       |   4 +-
 docs/dev/table/connectors/hbase.md                 | 291 +++++++++++++++++++++
 docs/dev/table/connectors/hbase.zh.md              | 291 +++++++++++++++++++++
 docs/dev/table/connectors/index.md                 | 268 +++++++++++++++++++
 docs/dev/table/connectors/index.zh.md              | 268 +++++++++++++++++++
 .../connector/hbase/HBaseDynamicTableFactory.java  |   2 +-
 .../connector/hbase/HBaseConnectorITCase.java      |   4 +-
 .../hbase/HBaseDynamicTableFactoryTest.java        |   2 +-
 9 files changed, 1124 insertions(+), 10 deletions(-)
 create mode 100644 docs/dev/table/connectors/hbase.md
 create mode 100644 docs/dev/table/connectors/hbase.zh.md
 create mode 100644 docs/dev/table/connectors/index.md
 create mode 100644 docs/dev/table/connectors/index.zh.md


[flink] 02/03: [FLINK-17995][docs][table] Redesign Table & SQL Connectors pages

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c977bb19a967ea5abe38959e5d10c678b20b966
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu May 28 22:10:23 2020 +0800

    [FLINK-17995][docs][table] Redesign Table & SQL Connectors pages
    
    This closes #12386
---
 docs/dev/table/connect.md             |   4 +-
 docs/dev/table/connect.zh.md          |   4 +-
 docs/dev/table/connectors/index.md    | 268 ++++++++++++++++++++++++++++++++++
 docs/dev/table/connectors/index.zh.md | 268 ++++++++++++++++++++++++++++++++++
 4 files changed, 538 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index ac2646c..ec721f4 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -1,7 +1,5 @@
 ---
-title: "Table API Connectors"
-nav-parent_id: connectors-root
-nav-pos: 2
+title: "Table API Legacy Connectors"
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index a6ac36b..7621690 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -1,7 +1,5 @@
 ---
-title: "Table API Connectors"
-nav-parent_id: connectors-root
-nav-pos: 2
+title: "Table API Legacy Connectors"
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/dev/table/connectors/index.md b/docs/dev/table/connectors/index.md
new file mode 100644
index 0000000..c2a08b6
--- /dev/null
+++ b/docs/dev/table/connectors/index.md
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to register table sources and table sinks in Flink using the natively supported connectors. After a source or sink has been registered, it can be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can define the table name, the table schema, and the table options for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this way the desired connection properties are converted into string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory.
+
+If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.
+
+{% top %}
+
+Schema Mapping
+------------
+
+The body clause of a SQL `CREATE TABLE` statement defines the names and types of columns, constraints and watermarks. Flink doesn't hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field  [...]
+
+The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Primary Key
+
+Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain nulls. Primary key uniquely identifies a row in a table.
+
+The primary key of a source table is a metadata information for optimization. The primary key of a sink table is usually used by the sink implementation for upserting.
+
+SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN,
+  PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- defines a primary key on columns
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Time Attributes
+
+Time attributes are essential when working with unbounded streaming tables. Therefore both proctime and rowtime attributes can be defined as part of the schema.
+
+For more information about time handling in Flink and especially event-time, we recommend the general [event-time section](streaming/time_attributes.html).
+
+#### Proctime Attributes
+
+In order to declare a proctime attribute in the schema, you can use [Computed Column syntax]({{ site.baseurl }}/dev/table/sql/create.html#create-table) to declare a computed column which is generated from `PROCTIME()` builtin function.
+The computed column is a virtual column which is not stored in the physical data.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+  MyField4 AS PROCTIME() -- declares a proctime attribute
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+#### Rowtime Attributes
+
+In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.
+
+Please refer to [CREATE TABLE statements](sql/create.html#create-table) for more information about defining time attributes in DDL.
+
+The following timestamp extractors are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+-- use the existing TIMESTAMP(3) field in schema as the rowtime attribute
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ...
+) WITH (
+  ...
+)
+
+-- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field
+CREATE TABLE MyTable (
+  log_ts STRING,
+  ts_field AS TO_TIMESTAMP(log_ts),
+  WATERMARK FOR ts_field AS ...
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+The following watermark strategies are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+-- Sets a watermark strategy for strictly ascending rowtime attributes. Emits a watermark of the
+-- maximum observed timestamp so far. Rows that have a timestamp smaller to the max timestamp
+-- are not late.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field
+) WITH (
+  ...
+)
+
+-- Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+-- observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+-- are not late.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field - INTERVAL '0.001' SECOND
+) WITH (
+  ...
+)
+
+-- Sets a watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+-- Emits watermarks which are the maximum observed timestamp minus the specified delay, e.g. 2 seconds.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field - INTERVAL '2' SECOND
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.
+
+### SQL Types
+
+Please see the [Data Types](types.html) page about how to declare a type in SQL.
+
+{% top %}
\ No newline at end of file
diff --git a/docs/dev/table/connectors/index.zh.md b/docs/dev/table/connectors/index.zh.md
new file mode 100644
index 0000000..c2a08b6
--- /dev/null
+++ b/docs/dev/table/connectors/index.zh.md
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to register table sources and table sinks in Flink using the natively supported connectors. After a source or sink has been registered, it can be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can define the table name, the table schema, and the table options for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this way the desired connection properties are converted into string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory.
+
+If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties.
+
+{% top %}
+
+Schema Mapping
+------------
+
+The body clause of a SQL `CREATE TABLE` statement defines the names and types of columns, constraints and watermarks. Flink doesn't hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field  [...]
+
+The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Primary Key
+
+Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain nulls. Primary key uniquely identifies a row in a table.
+
+The primary key of a source table is a metadata information for optimization. The primary key of a sink table is usually used by the sink implementation for upserting.
+
+SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN,
+  PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- defines a primary key on columns
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Time Attributes
+
+Time attributes are essential when working with unbounded streaming tables. Therefore both proctime and rowtime attributes can be defined as part of the schema.
+
+For more information about time handling in Flink and especially event-time, we recommend the general [event-time section](streaming/time_attributes.html).
+
+#### Proctime Attributes
+
+In order to declare a proctime attribute in the schema, you can use [Computed Column syntax]({{ site.baseurl }}/dev/table/sql/create.html#create-table) to declare a computed column which is generated from `PROCTIME()` builtin function.
+The computed column is a virtual column which is not stored in the physical data.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+  MyField4 AS PROCTIME() -- declares a proctime attribute
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+#### Rowtime Attributes
+
+In order to control the event-time behavior for tables, Flink provides predefined timestamp extractors and watermark strategies.
+
+Please refer to [CREATE TABLE statements](sql/create.html#create-table) for more information about defining time attributes in DDL.
+
+The following timestamp extractors are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+-- use the existing TIMESTAMP(3) field in schema as the rowtime attribute
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ...
+) WITH (
+  ...
+)
+
+-- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field
+CREATE TABLE MyTable (
+  log_ts STRING,
+  ts_field AS TO_TIMESTAMP(log_ts),
+  WATERMARK FOR ts_field AS ...
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+The following watermark strategies are supported:
+
+<div class="codetabs" markdown="1">
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+-- Sets a watermark strategy for strictly ascending rowtime attributes. Emits a watermark of the
+-- maximum observed timestamp so far. Rows that have a timestamp smaller to the max timestamp
+-- are not late.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field
+) WITH (
+  ...
+)
+
+-- Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
+-- observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
+-- are not late.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field - INTERVAL '0.001' SECOND
+) WITH (
+  ...
+)
+
+-- Sets a watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
+-- Emits watermarks which are the maximum observed timestamp minus the specified delay, e.g. 2 seconds.
+CREATE TABLE MyTable (
+  ts_field TIMESTAMP(3),
+  WATERMARK FOR ts_field AS ts_field - INTERVAL '2' SECOND
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+Make sure to always declare both timestamps and watermarks. Watermarks are required for triggering time-based operations.
+
+### SQL Types
+
+Please see the [Data Types](types.html) page about how to declare a type in SQL.
+
+{% top %}
\ No newline at end of file


[flink] 03/03: [FLINK-17830][docs][hbase] Add documentation for the new HBase SQL connector

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c1579fc0a801c8ec92212bc8c037581267ce281
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu May 28 22:10:53 2020 +0800

    [FLINK-17830][docs][hbase] Add documentation for the new HBase SQL connector
    
    This closes #12386
---
 docs/dev/table/connectors/hbase.md    | 291 ++++++++++++++++++++++++++++++++++
 docs/dev/table/connectors/hbase.zh.md | 291 ++++++++++++++++++++++++++++++++++
 2 files changed, 582 insertions(+)

diff --git a/docs/dev/table/connectors/hbase.md b/docs/dev/table/connectors/hbase.md
new file mode 100644
index 0000000..810703f
--- /dev/null
+++ b/docs/dev/table/connectors/hbase.md
@@ -0,0 +1,291 @@
+---
+title: "HBase SQL Connector"
+nav-title: HBase
+nav-parent_id: sql-connectors
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Bounded</span>
+<span class="label label-primary">Lookup Source: Sync Mode</span>
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The HBase connector allows for reading from and writing to an HBase cluster. This document describes how to setup the HBase Connector to run SQL queries against HBase.
+
+The connector can operate in upsert mode for exchange changelog messages with the external system using a primary key defined on the DDL. But the primary key can only be defined on the HBase rowkey field. If the PRIMARY KEY clause is not declared, the HBase connector will take rowkey as the primary key by default.
+
+<span class="label label-danger">Attention</span> HBase as a Lookup Source does not use any cache, data is always queried directly through the HBase client.
+
+Dependencies
+------------
+
+In order to setup the HBase connector, the following table provide dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+| HBase Version       | Maven dependency                                          | SQL Client JAR         |
+| :------------------ | :-------------------------------------------------------- | :----------------------|
+| 1.4.x               | `flink-connector-hbase{{site.scala_version_suffix}}`     | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-connector-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
+
+{% else %}
+
+The dependency table is only available for stable releases.
+
+{% endif %}
+
+How to create an HBase table
+----------------
+
+All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field (e.g. STRING, BIGINT) will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted using backticks if  [...]
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE hTable (
+ rowkey INT,
+ family1 ROW<q1 INT>,
+ family2 ROW<q2 STRING, q3 BIGINT>,
+ family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
+ PRIMARY KEY (rowkey) NOT ENFORCED
+) WITH (
+ 'connector' = 'hbase-1.4',
+ 'table-name' = 'mytable',
+ 'zookeeper.quorum' = 'localhost:2121'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, here should be 'hbase-1.4'.</td>
+    </tr>
+    <tr>
+      <td><h5>table-name</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of HBase table to connect.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.quorum</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The HBase Zookeeper quorum.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.znode.parent</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">/hbase</td>
+      <td>String</td>
+      <td>The root dir in Zookeeper for HBase cluster</td>
+    </tr>
+    <tr>
+      <td><h5>null-string-literal</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">null</td>
+      <td>String</td>
+      <td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Writing option, maximum size in memory of buffered rows for each writing request.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.max-rows</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Writing option, maximum number of rows to buffer for each writing request.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      No default value, which means the default flushing is not depends on the number of buffered rows
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Writing option, the interval to flush buffered rows.
+      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+
+
+Data Type Mapping
+----------------
+
+HBase stores all data as byte arrays. The data needs to be serialized and deserialized during read and write operation
+
+When serializing and de-serializing, Flink HBase connector uses utility class `org.apache.hadoop.hbase.util.Bytes` provided by HBase (Hadoop) to convert Flink Data Types to and from byte arrays.
+
+Flink HBase connector encodes `null` values to empty bytes, and decode empty bytes to `null` values for all data types except string type. For string type, the null literal is determined by `null-string-literal` option.
+
+The data type mappings are as follows:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink Data Type</th>
+        <th class="text-center">HBase conversion</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>CHAR / VARCHAR / STRING</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(String s)
+String toString(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BOOLEAN</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(boolean b)
+boolean toBoolean(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BINARY / VARBINARY</td>
+      <td>Returns <code>byte[]</code> as is.</td>
+    </tr>
+    <tr>
+      <td>DECIMAL</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(BigDecimal v)
+BigDecimal toBigDecimal(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>TINYINT</td>
+      <td>
+{% highlight java %}
+new byte[] { val }
+bytes[0] // returns first and only byte from bytes
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>SMALLINT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(short val)
+short toShort(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>INT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(int val)
+int toInt(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BIGINT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(long val)
+long toLong(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>FLOAT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(float val)
+float toFloat(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>DOUBLE</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(double val)
+double toDouble(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>DATE</td>
+      <td>Stores the number of days since epoch as int value.</td>
+    </tr>
+    <tr>
+      <td>TIME</td>
+      <td>Stores the number of milliseconds of the day as int value.</td>
+    </tr>
+    <tr>
+      <td>TIMESTAMP</td>
+      <td>Stores the milliseconds since epoch as long value.</td>
+    </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>Not supported</td>
+    </tr>
+    <tr>
+      <td>MAP / MULTISET</td>
+      <td>Not supported</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>Not supported</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file
diff --git a/docs/dev/table/connectors/hbase.zh.md b/docs/dev/table/connectors/hbase.zh.md
new file mode 100644
index 0000000..810703f
--- /dev/null
+++ b/docs/dev/table/connectors/hbase.zh.md
@@ -0,0 +1,291 @@
+---
+title: "HBase SQL Connector"
+nav-title: HBase
+nav-parent_id: sql-connectors
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Bounded</span>
+<span class="label label-primary">Lookup Source: Sync Mode</span>
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The HBase connector allows for reading from and writing to an HBase cluster. This document describes how to setup the HBase Connector to run SQL queries against HBase.
+
+The connector can operate in upsert mode for exchange changelog messages with the external system using a primary key defined on the DDL. But the primary key can only be defined on the HBase rowkey field. If the PRIMARY KEY clause is not declared, the HBase connector will take rowkey as the primary key by default.
+
+<span class="label label-danger">Attention</span> HBase as a Lookup Source does not use any cache, data is always queried directly through the HBase client.
+
+Dependencies
+------------
+
+In order to setup the HBase connector, the following table provide dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+| HBase Version       | Maven dependency                                          | SQL Client JAR         |
+| :------------------ | :-------------------------------------------------------- | :----------------------|
+| 1.4.x               | `flink-connector-hbase{{site.scala_version_suffix}}`     | [Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-connector-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
+
+{% else %}
+
+The dependency table is only available for stable releases.
+
+{% endif %}
+
+How to create an HBase table
+----------------
+
+All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field (e.g. STRING, BIGINT) will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted using backticks if  [...]
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE hTable (
+ rowkey INT,
+ family1 ROW<q1 INT>,
+ family2 ROW<q2 STRING, q3 BIGINT>,
+ family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
+ PRIMARY KEY (rowkey) NOT ENFORCED
+) WITH (
+ 'connector' = 'hbase-1.4',
+ 'table-name' = 'mytable',
+ 'zookeeper.quorum' = 'localhost:2121'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, here should be 'hbase-1.4'.</td>
+    </tr>
+    <tr>
+      <td><h5>table-name</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of HBase table to connect.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.quorum</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The HBase Zookeeper quorum.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.znode.parent</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">/hbase</td>
+      <td>String</td>
+      <td>The root dir in Zookeeper for HBase cluster</td>
+    </tr>
+    <tr>
+      <td><h5>null-string-literal</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">null</td>
+      <td>String</td>
+      <td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Writing option, maximum size in memory of buffered rows for each writing request.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.max-rows</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Writing option, maximum number of rows to buffer for each writing request.
+      This can improve performance for writing data to HBase database, but may increase the latency.
+      No default value, which means the default flushing is not depends on the number of buffered rows
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.interval</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Duration</td>
+      <td>Writing option, the interval to flush buffered rows.
+      No default value, which means no asynchronous flush thread will be scheduled. Examples: '1s', '5 s'.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+
+
+Data Type Mapping
+----------------
+
+HBase stores all data as byte arrays. The data needs to be serialized and deserialized during read and write operation
+
+When serializing and de-serializing, Flink HBase connector uses utility class `org.apache.hadoop.hbase.util.Bytes` provided by HBase (Hadoop) to convert Flink Data Types to and from byte arrays.
+
+Flink HBase connector encodes `null` values to empty bytes, and decode empty bytes to `null` values for all data types except string type. For string type, the null literal is determined by `null-string-literal` option.
+
+The data type mappings are as follows:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Flink Data Type</th>
+        <th class="text-center">HBase conversion</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>CHAR / VARCHAR / STRING</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(String s)
+String toString(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BOOLEAN</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(boolean b)
+boolean toBoolean(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BINARY / VARBINARY</td>
+      <td>Returns <code>byte[]</code> as is.</td>
+    </tr>
+    <tr>
+      <td>DECIMAL</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(BigDecimal v)
+BigDecimal toBigDecimal(byte[] b)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>TINYINT</td>
+      <td>
+{% highlight java %}
+new byte[] { val }
+bytes[0] // returns first and only byte from bytes
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>SMALLINT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(short val)
+short toShort(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>INT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(int val)
+int toInt(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>BIGINT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(long val)
+long toLong(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>FLOAT</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(float val)
+float toFloat(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>DOUBLE</td>
+      <td>
+{% highlight java %}
+byte[] toBytes(double val)
+double toDouble(byte[] bytes)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td>DATE</td>
+      <td>Stores the number of days since epoch as int value.</td>
+    </tr>
+    <tr>
+      <td>TIME</td>
+      <td>Stores the number of milliseconds of the day as int value.</td>
+    </tr>
+    <tr>
+      <td>TIMESTAMP</td>
+      <td>Stores the milliseconds since epoch as long value.</td>
+    </tr>
+    <tr>
+      <td>ARRAY</td>
+      <td>Not supported</td>
+    </tr>
+    <tr>
+      <td>MAP / MULTISET</td>
+      <td>Not supported</td>
+    </tr>
+    <tr>
+      <td>ROW</td>
+      <td>Not supported</td>
+    </tr>
+    </tbody>
+</table>
\ No newline at end of file


[flink] 01/03: [hotfix][hbase] Rename HBase connector option 'zookeeper.znode-parent' to 'zookeeper.znode.parent'

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fe7734d890b318a644068474c295c361a2d79ce4
Author: Jark Wu <ja...@apache.org>
AuthorDate: Thu May 28 19:02:37 2020 +0800

    [hotfix][hbase] Rename HBase connector option 'zookeeper.znode-parent' to 'zookeeper.znode.parent'
    
    'zookeeper.znode.parent' configuration key is used in HBase, in order to be close to HBase users, it would be better to use the same key 'zookeeper.znode.parent'.
---
 .../org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java    | 2 +-
 .../java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java   | 4 ++--
 .../apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java    | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
index 64b381f..b0e6dee 100644
--- a/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-hbase/src/main/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactory.java
@@ -63,7 +63,7 @@ public class HBaseDynamicTableFactory implements DynamicTableSourceFactory, Dyna
 		.withDescription("Required. It defines HBase Zookeeper quorum.");
 
 	private static final ConfigOption<String> ZOOKEEPER_ZNODE_PARENT = ConfigOptions
-		.key("zookeeper.znode-parent")
+		.key("zookeeper.znode.parent")
 		.stringType()
 		.defaultValue("/hbase")
 		.withDescription("Optional. The root dir in Zookeeper for HBase cluster, default value is '/hbase'");
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
index 7724873..3777dec 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseConnectorITCase.java
@@ -389,7 +389,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 					" 'connector' = 'hbase-1.4'," +
 					" 'table-name' = '" + TEST_TABLE_1 + "'," +
 					" 'zookeeper.quorum' = '" + getZookeeperQuorum() + "'," +
-					" 'zookeeper.znode-parent' = '/hbase'" +
+					" 'zookeeper.znode.parent' = '/hbase'" +
 					")");
 		}
 
@@ -489,7 +489,7 @@ public class HBaseConnectorITCase extends HBaseTestBase {
 				"    'connector' = 'hbase-1.4',\n" +
 				"    'table-name' = 'testTable3',\n" +
 				"    'zookeeper.quorum' = '" + quorum + "',\n" +
-				"    'zookeeper.znode-parent' = '/hbase' " +
+				"    'zookeeper.znode.parent' = '/hbase' " +
 				")";
 		}
 		tEnv.executeSql(ddl);
diff --git a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
index 17fe3a6..32e104a 100644
--- a/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
+++ b/flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/HBaseDynamicTableFactoryTest.java
@@ -224,7 +224,7 @@ public class HBaseDynamicTableFactoryTest {
 		options.put("connector", "hbase-1.4");
 		options.put("table-name", "testHBastTable");
 		options.put("zookeeper.quorum", "localhost:2181");
-		options.put("zookeeper.znode-parent", "/flink");
+		options.put("zookeeper.znode.parent", "/flink");
 		options.put("sink.buffer-flush.max-size", "10mb");
 		options.put("sink.buffer-flush.max-rows", "1000");
 		options.put("sink.buffer-flush.interval", "10s");