You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/02/22 21:35:44 UTC

[flink] branch master updated: [FLINK-21426][docs] adds English details to jdbc sink connector

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f053e  [FLINK-21426][docs] adds English details to jdbc sink connector
e3f053e is described below

commit e3f053e0ba05557e3a253e022259890d948557b5
Author: sv3ndk <sv...@gmail.com>
AuthorDate: Sat Feb 20 21:52:34 2021 +0100

    [FLINK-21426][docs] adds English details to jdbc sink connector
    
    This closes #14975
---
 docs/content/docs/connectors/datastream/jdbc.md | 129 +++++++++++++++++++-----
 docs/layouts/shortcodes/javadoc.html            |   4 +-
 2 files changed, 106 insertions(+), 27 deletions(-)

diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index afcdd14..f4916af 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -28,46 +28,125 @@ under the License.
 
 This connector provides a sink that writes data to a JDBC database.
 
-To use it, add the following dependency to your project (along with your JDBC-driver):
+To use it, add the following dependency to your project (along with your JDBC driver):
 
 {{< artifact flink-connector-jdbc withScalaVersion >}}
 
 Note that the streaming connectors are currently __NOT__ part of the binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/datastream/project-configuration" >}}).
 
-Created JDBC sink provides at-least-once guarantee.
-Effectively exactly-once can be achieved using upsert statements or idempotent updates.
 
-Example usage:
+## `JdbcSink.sink`
+
+The JDBC sink provides at-least-once guarantee.
+Effectively though, exactly-once can be achieved by crafting upsert SQL statements or idempotent SQL updates.
+Configuration goes as follow (see also {{< javadoc file="/api/java/org/apache/flink/connector/jdbc/JdbcSink.html" name="JdbcSink javadoc" >}}.
 
 ```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-env
-        .fromElements(...)
-        .addSink(JdbcSink.sink(
-                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
-                (ps, t) -> {
-                    ps.setInt(1, t.id);
-                    ps.setString(2, t.title);
-                    ps.setString(3, t.author);
-                    ps.setDouble(4, t.price);
-                    ps.setInt(5, t.qty);
-                },
-                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
-                        .withUrl(getDbMetadata().getUrl())
-                        .withDriverName(getDbMetadata().getDriverClass())
-                        .build()));
-env.execute();
+JdbcSink.sink(
+      	sqlDmlStatement,                       // mandatory
+      	jdbcStatementBuilder,                  // mandatory   	
+      	jdbcExecutionOptions,                  // optional
+      	jdbcConnectionOptions                  // mandatory
+);
+```        	
+
+### SQL DML statement and JDBC statement builder
+
+The sink builds one [JDBC prepared statement](https://docs.oracle.com/en/java/javase/11/docs/api/java.sql/java/sql/PreparedStatement.html) from a user-provider SQL string, e.g.:
+
+```sql
+INSERT INTO some_table field1, field2 values (?, ?)
+```
+
+It then repeatedly calls a user-provided function to update that prepared statement with each value of the stream, e.g.:
+
+```
+(preparedStatement, someRecord) -> { ... update here the preparedStatement with values from someRecord ... }
 ```
 
-Please refer to the [API documentation]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcSink.html) for more details.
+### JDBC execution options
+
+The SQL DML statements are executed in batches, which can optionally be configured with the following instance (see also {{< javadoc name="JdbcExecutionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}})
+
+```java
+JdbcExecutionOptions.builder()
+        .withBatchIntervalMs(200)             // optional: default = 0, meaning no time-based execution is done
+        .withBathSize(1000)                   // optional: default = 5000 values
+        .withMaxRetries(5)                    // optional: default = 3 
+.build()
+```
+
+A JDBC batch is executed as soon as one of the following condition is true:
+
+* the configured batch interval time is elapsed
+* the maximum batch size is reached 
+* a Flink checkpoint has started
+
+### JDBC connection parameters
+
+The connection to the database is configured with a `JdbcConnectionOptions` instance. 
+Please see {{< javadoc name="JdbcConnectionOptions javadoc" file="/api/java/org/apache/flink/connector/jdbc/JdbcConnectionOptions.html" >}}) for details
+
+### Full example
+
+```java
+public class JdbcSinkExample {
+
+    static class Book {
+        public Book(Long id, String title, String authors, Integer year) {
+            this.id = id;
+            this.title = title;
+            this.authors = authors;
+            this.year = year;
+        }
+        final Long id;
+        final String title;
+        final String authors;
+        final Integer year;
+    }
+
+    public static void main(String[] args) throws Exception {
+        var env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        env.fromElements(
+                new Book(101L, "Stream Processing with Apache Flink", "Fabian Hueske, Vasiliki Kalavri", 2019),
+                new Book(102L, "Streaming Systems", "Tyler Akidau, Slava Chernyak, Reuven Lax", 2018),
+                new Book(103L, "Designing Data-Intensive Applications", "Martin Kleppmann", 2017),
+                new Book(104L, "Kafka: The Definitive Guide", "Gwen Shapira, Neha Narkhede, Todd Palino", 2017)
+        ).addSink(
+                JdbcSink.sink(
+                        "insert into books (id, title, authors, year) values (?, ?, ?, ?)",
+                        (statement, book) -> {
+                            statement.setLong(1, book.id);
+                            statement.setString(2, book.title);
+                            statement.setString(3, book.authors);
+                            statement.setInt(4, book.year);
+                        },
+                        JdbcExecutionOptions.builder()
+                                .withBatchSize(1000)
+                                .withBatchIntervalMs(200)
+                                .withMaxRetries(5)
+                                .build(),
+                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                                .withUrl("jdbc:postgresql://dbhost:5432/postgresdb")
+                                .withDriverName("org.postgresql.Driver")
+                                .withUsername("someUser")
+                                .withPassword("somePassword")
+                                .build()
+                ));
+                
+        env.execute();
+    }
+}
+```
 
-## Exactly-once
+## `JdbcSink.exactlyOnceSink`
 
 Since 1.13, Flink JDBC sink supports exactly-once mode. The implementation relies on the JDBC driver support of XA [standard](https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf).
 
 To use it, create a sink using `exactlyOnceSink()` method as above and additionally provide:
-- [exactly-once options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html)
-- [execution options]({{ site.javadocs_baseurl }}/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html)
+- {{< javadoc name="exactly-once options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExactlyOnceOptions.html" >}}
+- {{< javadoc name="execution options" file="/api/java/org/apache/flink/connector/jdbc/JdbcExecutionOptions.html" >}}
 - [XA DataSource](https://docs.oracle.com/javase/8/docs/api/javax/sql/XADataSource.html) Supplier
 
 ```java
diff --git a/docs/layouts/shortcodes/javadoc.html b/docs/layouts/shortcodes/javadoc.html
index c76d684..3e64fcf 100644
--- a/docs/layouts/shortcodes/javadoc.html
+++ b/docs/layouts/shortcodes/javadoc.html
@@ -21,6 +21,6 @@ under the License.
     Parmeters: 
         - name: The rendered link name (required)
 */}}
-<a href="{{ .Site.Params.JavaDoc }}">
+<a href="{{ .Site.Params.JavaDoc }}/{{ .Get "file" }}">
     {{ .Get "name" }}
-</a>
\ No newline at end of file
+</a>