You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/02/22 21:35:44 UTC
[flink] branch master updated: [FLINK-21426][docs] adds English
details to jdbc sink connector
This is an automated email from the ASF dual-hosted git repository.
sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e3f053e [FLINK-21426][docs] adds English details to jdbc sink connector
e3f053e is described below
commit e3f053e0ba05557e3a253e022259890d948557b5
Author: sv3ndk <sv...@gmail.com>
AuthorDate: Sat Feb 20 21:52:34 2021 +0100
[FLINK-21426][docs] adds English details to jdbc sink connector
This closes #14975
---
docs/content/docs/connectors/datastream/jdbc.md | 129 +++++++++++++++++++-----
docs/layouts/shortcodes/javadoc.html | 4 +-
2 files changed, 106 insertions(+), 27 deletions(-)
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index afcdd14..f4916af 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -28,46 +28,125 @@ under the License.
This connector provides a sink that writes data to a JDBC database.
-To use it, add the following dependency to your project (along with your JDBC-driver):
+To use it, add the following dependency to your project (along with your JDBC driver):
{{< artifact flink-connector-jdbc withScalaVersion >}}
Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}).
-Created JDBC sink provides at-least-once guarantee.
-Effectively exactly-once can be achieved using upsert statements or idempotent updates.
-Example usage:
+## `JdbcSink.sink`
+
+The JDBC sink provides at-least-once guarantee.
+Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
+Configuration goes as follow (see also {{< javadoc file="/api/java/org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}.
```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env
- .fromElements(...)
- .addSink(JdbcSink.sink(
- "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
- (ps, t) -> {
- ps.setInt(1, t.id);
- ps.setString(2, t.title);
- ps.setString(3, t.author);
- ps.setDouble(4, t.price);
- ps.setInt(5, t.qty);
- },
- new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
- .withUrl(getDbMetadata().getUrl())
- .withDriverName(getDbMetadata().getDriverClass())
- .build()));
-env.execute();
+JdbcSink.sink(
+ sqlDmlStatement, // mandatory
+ jdbcStatementBuilder, // mandatory
+ jdbcExecutionOptions, // optional
+ jdbcConnectionOptions // mandatory
+);
+```
+
+### SQL DML statement and JDBC statement builder
+
+The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.:
+
+```sql
+INSERT INTO some_table field1, field2 values (?, ?)
+```
+
+It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:
+
+```
+(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }
```
-Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
+### JDBC execution options
+
+The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})
+
+```java
+JdbcExecutionOptions.builder()
+ .withBatchIntervalMs(200) // optional: default = 0, meaning no time-based execution is done
+ .withBathSize(1000) // optional: default = 5000 values
+ .withMaxRetries(5) // optional: default = 3
+.build()
+```
+
+A JDBC batch is executed as soon as one of the following condition is true:
+
+* the configured batch interval time is elapsed
+* the maximum batch size is reached
+* a Flink checkpoint has started
+
+### JDBC connection parameters
+
+The connection to the database is configured with a `JdbcConnectionOptions` instance.
+Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.html" >}}) for details
+
+### Full example
+
+```java
+public class JdbcSinkExample {
+
+ static class Book {
+ public Book(Long id, String title, String authors, Integer year) {
+ this.id = id;
+ this.title = title;
+ this.authors = authors;
+ this.year = year;
+ }
+ final Long id;
+ final String title;
+ final String authors;
+ final Integer year;
+ }
+
+ public static void main(String[] args) throws Exception {
+ var env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.fromElements(
+ new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+ new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+ new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+ new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+ ).addSink(
+ JdbcSink.sink(
+ "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+ (statement, book) -> {
+ statement.setLong(1, book.id);
+ statement.setString(2, book.title);
+ statement.setString(3, book.authors);
+ statement.setInt(4, book.year);
+ },
+ JdbcExecutionOptions.builder()
+ .withBatchSize(1000)
+ .withBatchIntervalMs(200)
+ .withMaxRetries(5)
+ .build(),
+ new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+ .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
+ .withDriverName("org.postgresql.Driver")
+ .withUsername("someUser")
+ .withPassword("somePassword")
+ .build()
+ ));
+
+ env.execute();
+ }
+}
+```
-## Exactly-once
+## `JdbcSink.exactlyOnceSink`
Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA [standard](https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf).
To use it, create a sink using `exactlyOnceSink()` method as above and additionally provide:
-- [exactly-once options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html)
-- [execution options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html)
+- {{< javadoc name="exactly-once options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html" >}}
+- {{< javadoc name="execution options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}}
- [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier
```java
diff --git a/docs/layouts/shortcodes/javadoc.html b/docs/layouts/shortcodes/javadoc.html
index c76d684..3e64fcf 100644
--- a/docs/layouts/shortcodes/javadoc.html
+++ b/docs/layouts/shortcodes/javadoc.html
@@ -21,6 +21,6 @@ under the License.
Parmeters:
- name: The rendered link name (required)
*/}}
-<a href="{{ .Site.Params.JavaDoc }}">
+<a href="{{ .Site.Params.JavaDoc }}/{{ .Get "file" }}">
{{ .Get "name" }}
-</a>
\ No newline at end of file
+</a>