You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/08 04:32:54 UTC
[incubator-seatunnel] branch dev updated: [Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 56f68afc [Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)
56f68afc is described below
commit 56f68afc2ed5be48ee770cccb8cd4a70f10584a6
Author: dijie <nj...@gmail.com>
AuthorDate: Sun May 8 12:32:50 2022 +0800
[Feature][flink-connector-jdbc] add pre sql and post sql #1789 (#1791)
* [Feature][flink-connector-jdbc] add pre sql and post sql #1789
---
docs/en/connector/sink/Jdbc.mdx | 37 ++++++++----
.../org/apache/seatunnel/flink/jdbc/Config.java | 14 +++++
.../apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 67 ++++++++++++++++++++++
3 files changed, 107 insertions(+), 11 deletions(-)
diff --git a/docs/en/connector/sink/Jdbc.mdx b/docs/en/connector/sink/Jdbc.mdx
index 607d32d0..5bb3fa4b 100644
--- a/docs/en/connector/sink/Jdbc.mdx
+++ b/docs/en/connector/sink/Jdbc.mdx
@@ -83,17 +83,20 @@ Configure when `saveMode` is specified as `update` , whether to show sql
</TabItem>
<TabItem value="flink">
-| name | type | required | default value |
-| ----------------- | ------ | -------- | ------------- |
-| driver | string | yes | - |
-| url | string | yes | - |
-| username | string | yes | - |
-| password | string | no | - |
-| query | string | yes | - |
-| batch_size | int | no | - |
-| source_table_name | string | yes | - |
-| common-options | string | no | - |
-| parallelism | int | no | - |
+| name | type | required | default value |
+| -------------------------- | ------- | -------- | ------------- |
+| driver | string | yes | - |
+| url | string | yes | - |
+| username | string | yes | - |
+| password | string | no | - |
+| query | string | yes | - |
+| batch_size | int | no | - |
+| source_table_name | string | yes | - |
+| common-options | string | no | - |
+| parallelism | int | no | - |
+| pre_sql | string | no | - |
+| post_sql | string | no | - |
+| ignore_post_sql_exceptions | boolean | no | - |
### driver [string]
@@ -125,6 +128,18 @@ Number of writes per batch
The parallelism of an individual operator, for JdbcSink.
+### pre_sql [string]
+
+This sql can be executed before output.
+
+### post_sql [string]
+
+This sql can be executed after output, and just supports for batch job.
+
+### ignore_post_sql_exceptions [boolean]
+
+Whether to ignore post_sql exceptions.
+
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](common-options.md) for details
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
index d3ff3700..0c600f98 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java
@@ -91,4 +91,18 @@ public final class Config {
*/
public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";
+ /**
+ * Jdbc pre sql for sink
+ */
+ public static final String SINK_PRE_SQL = "pre_sql";
+
+ /**
+ * Jdbc post sql for sink
+ */
+ public static final String SINK_POST_SQL = "post_sql";
+
+ /**
+ * Jdbc ignore post sql exceptions for sink
+ */
+ public static final String SINK_IGNORE_POST_SQL_EXCEPTIONS = "ignore_post_sql_exceptions";
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 2ac51164..786efb75 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -23,6 +23,9 @@ import static org.apache.seatunnel.flink.jdbc.Config.QUERY;
import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_INTERVAL;
import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_MAX_RETRIES;
import static org.apache.seatunnel.flink.jdbc.Config.SINK_BATCH_SIZE;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_IGNORE_POST_SQL_EXCEPTIONS;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_POST_SQL;
+import static org.apache.seatunnel.flink.jdbc.Config.SINK_PRE_SQL;
import static org.apache.seatunnel.flink.jdbc.Config.URL;
import static org.apache.seatunnel.flink.jdbc.Config.USERNAME;
@@ -34,6 +37,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
@@ -45,11 +49,18 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Arrays;
public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
+ private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class);
private static final long serialVersionUID = 3677571223952518115L;
private static final int DEFAULT_BATCH_SIZE = 5000;
private static final int DEFAULT_MAX_RETRY_TIMES = 3;
@@ -62,6 +73,9 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
private String username;
private String password;
private String query;
+ private String preSql;
+ private String postSql;
+ private boolean ignorePostSqlExceptions = false;
private int batchSize = DEFAULT_BATCH_SIZE;
private long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
private int maxRetries = DEFAULT_MAX_RETRY_TIMES;
@@ -99,6 +113,15 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
if (config.hasPath(SINK_BATCH_MAX_RETRIES)) {
maxRetries = config.getInt(SINK_BATCH_MAX_RETRIES);
}
+ if (config.hasPath(SINK_PRE_SQL)) {
+ preSql = config.getString(SINK_PRE_SQL);
+ }
+ if (!env.isStreaming() && config.hasPath(SINK_POST_SQL)) {
+ postSql = config.getString(SINK_POST_SQL);
+ }
+ if (config.hasPath(SINK_IGNORE_POST_SQL_EXCEPTIONS)) {
+ ignorePostSqlExceptions = config.getBoolean(SINK_IGNORE_POST_SQL_EXCEPTIONS);
+ }
}
@Override
@@ -108,6 +131,8 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
@Override
public void outputStream(FlinkEnvironment env, DataStream<Row> dataStream) {
+ executePreSql();
+
Table table = env.getStreamTableEnvironment().fromDataStream(dataStream);
TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
@@ -136,6 +161,8 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
@Override
public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
+ executePreSql();
+
Table table = env.getBatchTableEnvironment().fromDataSet(dataSet);
TypeInformation<?>[] fieldTypes = table.getSchema().getFieldTypes();
int[] types = Arrays.stream(fieldTypes).mapToInt(JdbcTypeUtil::typeInformationToSqlType).toArray();
@@ -151,4 +178,44 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
.finish();
dataSet.output(format);
}
+
+ @Override
+ public void close() throws Exception {
+ executePostSql();
+ }
+
+ private void executePreSql() {
+ if (StringUtils.isNotBlank(preSql)) {
+ LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+ try {
+ executeSql(preSql);
+ } catch (SQLException e) {
+ LOGGER.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void executePostSql() {
+ if (StringUtils.isNotBlank(postSql)) {
+ LOGGER.info("Starting to execute post sql: \n {}", postSql);
+ try {
+ executeSql(postSql);
+ } catch (SQLException e) {
+ LOGGER.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
+ if (!ignorePostSqlExceptions) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private void executeSql(String sql) throws SQLException {
+ try (Connection connection = DriverManager.getConnection(dbUrl, username, password);
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(sql);
+ LOGGER.info("Executed sql successfully.");
+ }
+ }
}