You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/08 04:32:54 UTC

[incubator-seatunnel] branch dev updated: [Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 56f68afc [Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)
56f68afc is described below

commit 56f68afc2ed5be48ee770cccb8cd4a70f10584a6
Author: dijie <nj...@gmail.com>
AuthorDate: Sun May 8 12:32:50 2022 +0800

    [Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)
    
    * [Feature][flink-connector-jdbc] add pre sql and post sql #1789
---
 docs/en/connector/sink/Jdbc.mdx                    | 37 ++++++++----
 .../org/apache/seatunnel/flink/jdbc/Config.java    | 14 +++++
 .../apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 67 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 11 deletions(-)

diff --git a/docs/en/connector/sink/Jdbc.mdx b/docs/en/connector/sink/Jdbc.mdx
index 607d32d0..5bb3fa4b 100644
--- a/docs/en/connector/sink/Jdbc.mdx
+++ b/docs/en/connector/sink/Jdbc.mdx
@@ -83,17 +83,20 @@ Configure when `saveMode` is specified as `update` , whether to show sql
 </TabItem>
 <TabItem value="flink">
 
-| name              | type   | required | default value |
-| ----------------- | ------ | -------- | ------------- |
-| driver            | string | yes      | -             |
-| url               | string | yes      | -             |
-| username          | string | yes      | -             |
-| password          | string | no       | -             |
-| query             | string | yes      | -             |
-| batch_size        | int    | no       | -             |
-| source_table_name | string | yes      | -             |
-| common-options    | string | no       | -             |
-| parallelism       | int    | no       | -             |
+| name                       | type    | required | default value |
+| -------------------------- | ------- | -------- | ------------- |
+| driver                     | string  | yes      | -             |
+| url                        | string  | yes      | -             |
+| username                   | string  | yes      | -             |
+| password                   | string  | no       | -             |
+| query                      | string  | yes      | -             |
+| batch_size                 | int     | no       | -             |
+| source_table_name          | string  | yes      | -             |
+| common-options             | string  | no       | -             |
+| parallelism                | int     | no       | -             |
+| pre_sql                    | string  | no       | -             |
+| post_sql                   | string  | no       | -             |
+| ignore_post_sql_exceptions | boolean | no       | -             |
 
 ### driver [string]
 
@@ -125,6 +128,18 @@ Number of writes per batch
 
 The parallelism of an individual operator, for JdbcSink.
 
+### pre_sql [string]
+
+This sql can be executed before output.
+
+### post_sql [string]
+
+This sql can be executed after output, and just supports for batch job.
+
+### ignore_post_sql_exceptions [boolean]
+
+Whether to ignore post_sql exceptions.
+
 ### common options [string]
 
 Sink plugin common parameters, please refer to [Sink Plugin](common-options.md) for details
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
index d3ff3700..0c600f98 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
@@ -91,4 +91,18 @@ public final class Config {
      */
     public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";
 
+    /**
+     * Jdbc pre sql for sink
+     */
+    public static final String SINK_PRE_SQL = "pre_sql";
+
+    /**
+     * Jdbc post sql for sink
+     */
+    public static final String SINK_POST_SQL = "post_sql";
+
+    /**
+     * Jdbc ignore post sql exceptions for sink
+     */
+    public static final String SINK_IGNORE_POST_SQL_EXCEPTIONS = "ignore_post_sql_exceptions";
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 2ac51164..786efb75 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -23,6 +23,9 @@ import static org.apache.seatunnel.flink.jdbc.Config.QUERY;
 import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_INTERVAL;
 import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_MAX_RETRIES;
 import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_SIZE;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_IGNORE_POST_SQL_EXCEPTIONS;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_POST_SQL;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_PRE_SQL;
 import static org.apache.seatunnel.flink.jdbc.Config.URL;
 import static org.apache.seatunnel.flink.jdbc.Config.USERNAME;
 
@@ -34,6 +37,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
@@ -45,11 +49,18 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Arrays;
 
 public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class);
     private static final long serialVersionUID = 3677571223952518115L;
     private static final int DEFAULT_BATCH_SIZE = 5000;
     private static final int DEFAULT_MAX_RETRY_TIMES = 3;
@@ -62,6 +73,9 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
     private String username;
     private String password;
     private String query;
+    private String preSql;
+    private String postSql;
+    private boolean ignorePostSqlExceptions = false;
     private int batchSize = DEFAULT_BATCH_SIZE;
     private long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
     private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
@@ -99,6 +113,15 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
         if (config.hasPath(SINK_BATCH_MAX_RETRIES)) {
             maxRetries = config.getInt(SINK_BATCH_MAX_RETRIES);
         }
+        if (config.hasPath(SINK_PRE_SQL)) {
+            preSql = config.getString(SINK_PRE_SQL);
+        }
+        if (!env.isStreaming() && config.hasPath(SINK_POST_SQL)) {
+            postSql = config.getString(SINK_POST_SQL);
+        }
+        if (config.hasPath(SINK_IGNORE_POST_SQL_EXCEPTIONS)) {
+            ignorePostSqlExceptions = config.getBoolean(SINK_IGNORE_POST_SQL_EXCEPTIONS);
+        }
     }
 
     @Override
@@ -108,6 +131,8 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     @Override
     public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+        executePreSql();
+
         Table table = env.getStreamTableEnvironment().fromDataStream(dataStream);
         TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
 
@@ -136,6 +161,8 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     @Override
     public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+        executePreSql();
+
         Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
         TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
         int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
@@ -151,4 +178,44 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
                 .finish();
         dataSet.output(format);
     }
+
+    @Override
+    public void close() throws Exception {
+        executePostSql();
+    }
+
+    private void executePreSql() {
+        if (StringUtils.isNotBlank(preSql)) {
+            LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+            try {
+                executeSql(preSql);
+            } catch (SQLException e) {
+                LOGGER.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void executePostSql() {
+        if (StringUtils.isNotBlank(postSql)) {
+            LOGGER.info("Starting to execute post sql: \n {}", postSql);
+            try {
+                executeSql(postSql);
+            } catch (SQLException e) {
+                LOGGER.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
+                if (!ignorePostSqlExceptions) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private void executeSql(String sql) throws SQLException {
+        try (Connection connection = DriverManager.getConnection(dbUrl, username, password);
+            Statement statement = connection.createStatement()) {
+
+            statement.execute(sql);
+            LOGGER.info("Executed sql successfully.");
+        }
+    }
 }