You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/10/10 05:56:44 UTC

[flink] branch release-1.9 updated: [FLINK-13361][docs] Add documentation for JDBC connector for Table API & SQL

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f09ff5e  [FLINK-13361][docs] Add documentation for JDBC connector for Table API & SQL
f09ff5e is described below

commit f09ff5eb6d1e01ea77e87c6b8ba9d5752d492444
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Thu Oct 10 12:19:33 2019 +0800

    [FLINK-13361][docs] Add documentation for JDBC connector for Table API & SQL
    
    This closes #9802
---
 docs/dev/table/connect.md | 137 ++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 137 insertions(+)

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 860e7e2..f4f73e0 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -50,6 +50,7 @@ The following tables list all available connectors and formats. Their mutual com
 | Apache Kafka      | 0.11                | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}.jar) |
 | Apache Kafka      | 0.11+ (`universal`) | `flink-connector-kafka`      | [Download](http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka{{site.scala_version_suffix}}/{{site.version}}/flink-sql-connector-kafka{{site.scala_version_suffix}}-{{site.version}}.jar) |
 | HBase             | 1.4.3               | `flink-hbase`                | [Download](http://central.maven.org/maven2/org/apache/flink/flink-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-hbase{{site.scala_version_suffix}}-{{site.version}}.jar) |
+| JDBC              |                     | `flink-jdbc`                 | [Download](http://central.maven.org/maven2/org/apache/flink/flink-jdbc{{site.scala_version_suffix}}/{{site.version}}/flink-jdbc{{site.scala_version_suffix}}-{{site.version}}.jar) |
 
 ### Formats
 
@@ -1160,6 +1161,142 @@ CREATE TABLE MyUserTable (
 
 {% top %}
 
+### JDBC Connector
+
+<span class="label label-primary">Source: Batch</span>
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+<span class="label label-primary">Temporal Join: Sync Mode</span>
+
+The JDBC connector allows for reading from and writing into an JDBC client.
+
+The connector can operate in [upsert mode](#update-modes) for exchanging UPSERT/DELETE messages with the external system using a [key defined by the query](./streaming/dynamic_tables.html#table-to-stream-conversion).
+
+For append-only queries, the connector can also operate in [append mode](#update-modes) for exchanging only INSERT messages with the external system.
+
+To use JDBC connector, need to choose an actual driver to use. Here are drivers currently supported:
+
+**Supported Drivers:**
+
+| Name        |      Group Id      |      Artifact Id     |      JAR         |
+| :-----------| :------------------| :--------------------| :----------------|
+| MySQL       |        mysql       | mysql-connector-java | [Download](http://central.maven.org/maven2/mysql/mysql-connector-java/) |
+| PostgreSQL  |   org.postgresql   |      postgresql      | [Download](https://jdbc.postgresql.org/download.html) |
+| Derby       |  org.apache.derby  |        derby         | [Download](http://db.apache.org/derby/derby_downloads.html) |
+
+<br/>
+
+The connector can be defined as follows:
+
+<div class="codetabs" markdown="1">
+<div data-lang="YAML" markdown="1">
+{% highlight yaml %}
+connector:
+  type: jdbc
+  url: "jdbc:mysql://localhost:3306/flink-test"     # required: JDBC DB url
+  table: "jdbc_table_name"        # required: jdbc table name
+  driver: "com.mysql.jdbc.Driver" # optional: the class name of the JDBC driver to use to connect to this URL.
+                                  # If not set, it will automatically be derived from the URL.
+
+  username: "name"                # optional: jdbc user name and password
+  password: "password"
+  
+  read: # scan options, optional, used when reading from table
+    partition: # These options must all be specified if any of them is specified. In addition, partition.num must be specified. They
+               # describe how to partition the table when reading in parallel from multiple tasks. partition.column must be a numeric,
+               # date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide
+               # the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
+               # This option applies only to reading.
+      column: "column_name" # optional, name of the column used for partitioning the input.
+      num: 50               # optional, the number of partitions.
+      lower-bound: 500      # optional, the smallest value of the first partition.
+      upper-bound: 1000     # optional, the largest value of the last partition.
+    fetch-size: 100         # optional, Gives the reader a hint as to the number of rows that should be fetched
+                            # from the database when reading per round trip. If the value specified is zero, then
+                            # the hint is ignored. The default value is zero.
+  
+  lookup: # lookup options, optional, used in temporary join
+    cache:
+      max-rows: 5000 # optional, max number of rows of lookup cache, over this value, the oldest rows will
+                     # be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
+                     # of them is specified. Cache is not enabled as default.
+      ttl: "10s"     # optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
+                     # will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
+                     # them is specified. Cache is not enabled as default.
+    max-retries: 3   # optional, max retry times if lookup database failed
+  
+  write: # sink options, optional, used when writing into table
+      flush:
+        max-rows: 5000 # optional, flush max size (includes all append, upsert and delete records), 
+                       # over this number of records, will flush data. The default value is "5000".
+        interval: "2s" # optional, flush interval mills, over this time, asynchronous threads will flush data.
+                       # The default value is "0s", which means no asynchronous flush thread will be scheduled. 
+      max-retries: 3   # optional, max retry times if writing records to database failed.
+{% endhighlight %}
+</div>
+
+<div data-lang="DDL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  ...
+) WITH (
+  'connector.type' = 'jdbc', -- required: specify this table type is jdbc
+  
+  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- required: JDBC DB url
+  
+  'connector.table' = 'jdbc_table_name',  -- required: jdbc table name
+  
+  'connector.driver' = 'com.mysql.jdbc.Driver', -- optional: the class name of the JDBC driver to use to connect to this URL. 
+                                                -- If not set, it will automatically be derived from the URL.
+
+  'connector.username' = 'name', -- optional: jdbc user name and password
+  'connector.password' = 'password',
+  
+  -- scan options, optional, used when reading from table
+
+  -- These options must all be specified if any of them is specified. In addition, partition.num must be specified. They
+  -- describe how to partition the table when reading in parallel from multiple tasks. partition.column must be a numeric,
+  -- date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide
+  -- the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
+  -- This option applies only to reading.
+  'connector.read.partition.column' = 'column_name', -- optional, name of the column used for partitioning the input.
+  'connector.read.partition.num' = '50', -- optional, the number of partitions.
+  'connector.read.partition.lower-bound' = '500', -- optional, the smallest value of the first partition.
+  'connector.read.partition.upper-bound' = '1000', -- optional, the largest value of the last partition.
+  
+  'connector.read.fetch-size' = '100', -- optional, Gives the reader a hint as to the number of rows that should be fetched
+                                       -- from the database when reading per round trip. If the value specified is zero, then
+                                       -- the hint is ignored. The default value is zero.
+
+  -- lookup options, optional, used in temporary join
+  'connector.lookup.cache.max-rows' = '5000', -- optional, max number of rows of lookup cache, over this value, the oldest rows will
+                                              -- be eliminated. "cache.max-rows" and "cache.ttl" options must all be specified if any
+                                              -- of them is specified. Cache is not enabled as default.
+  'connector.lookup.cache.ttl' = '10s', -- optional, the max time to live for each rows in lookup cache, over this time, the oldest rows
+                                        -- will be expired. "cache.max-rows" and "cache.ttl" options must all be specified if any of
+                                        -- them is specified. Cache is not enabled as default.
+  'connector.lookup.max-retries' = '3', -- optional, max retry times if lookup database failed
+
+  -- sink options, optional, used when writing into table
+  'connector.write.flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records), 
+                                             -- over this number of records, will flush data. The default value is "5000".
+  'connector.write.flush.interval' = '2s', -- optional, flush interval mills, over this time, asynchronous threads will flush data.
+                                           -- The default value is "0s", which means no asynchronous flush thread will be scheduled. 
+  'connector.write.max-retries' = '3' -- optional, max retry times if writing records to database failed
+)
+{% endhighlight %}
+</div>
+</div>
+
+**Upsert sink:** Flink automatically extracts valid keys from a query. For example, a query `SELECT a, b, c FROM t GROUP BY a, b` defines a composite key of the fields `a` and `b`. If a JDBC table is used as upsert sink, please make sure keys of the query is one of the unique key sets or primary key of the underlying database. This can guarantee the output result is as expected.
+
+**Temporary Join:**  JDBC connector can be used in temporal join as a lookup source. Currently, only sync lookup mode is supported. The lookup cache options (`connector.lookup.cache.max-rows` and `connector.lookup.cache.ttl`) must all be specified if any of them is specified. The lookup cache is used to improve performance of temporal join JDBC connector by querying the cache first instead of send all requests to remote database. But the returned value might not be the latest if it is fr [...]
+
+**Writing:** As default, the `connector.write.flush.interval` is `0s` and `connector.write.flush.max-rows` is `5000`, which means for low traffic queries, the buffered output rows may not be flushed to database for a long time. So the interval configuration is recommended to set.
+
+{% top %}
+
 Table Formats
 -------------