You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/05/05 02:01:54 UTC

[GitHub] [incubator-seatunnel] ruanwenjun commented on a diff in pull request #1791: [Feature][flink-connector-jdbc] add pre sql and post sql #1789

ruanwenjun commented on code in PR #1791:
URL: https://github.com/apache/incubator-seatunnel/pull/1791#discussion_r865511012


##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/Config.java:
##########
@@ -91,4 +91,18 @@ public final class Config {
      */
     public static final String PARTITION_LOWER_BOUND = "partition_lower_bound";
 
+    /**
+     * Jdbc pre sql for sink
+     */
+    public static final String SINK_PRE_SQL = "pre_sql";
+
+    /**
+     * Jdbc post sql for sink
+     */
+    public static final String SINK_POST_SQL = "post_sql";
+
+    /**
+     * Jdbc ignore post sql exceptions for sink
+     */
+    public static final String SINK_IGNORE_POST_SQL_EXCEPTIONS = "ignore_post_sql_exceptions";

Review Comment:
   I don't think this parameter is needed, if you ignore the post sql exception, you may also need to add a parameter to ignore the pre sql exception, and indeed, you may also need to add a extra parameter to ignore some target exception, we cannot not cover all case, so the best way is not deal the exception here.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java:
##########
@@ -151,4 +178,56 @@ public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
                 .finish();
         dataSet.output(format);
     }
+
+    @Override
+    public void close() throws Exception {
+        executePostSql();
+    }
+
+    private void executePreSql() {
+        if (!StringUtils.isNotBlank(preSql)) {
+            LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+            try {
+                executeSql(preSql);
+            } catch (SQLException e) {
+                LOGGER.info("Execute pre sql failed", e);

Review Comment:
   Use error level is great.



##########
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java:
##########
@@ -151,4 +178,56 @@ public void outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {
                 .finish();
         dataSet.output(format);
     }
+
+    @Override
+    public void close() throws Exception {
+        executePostSql();
+    }
+
+    private void executePreSql() {
+        if (!StringUtils.isNotBlank(preSql)) {
+            LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+            try {
+                executeSql(preSql);
+            } catch (SQLException e) {
+                LOGGER.info("Execute pre sql failed", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void executePostSql() {
+        if (!StringUtils.isNotBlank(postSql)) {
+            LOGGER.info("Starting to execute post sql: \n {}", postSql);
+            try {
+                executeSql(postSql);
+            } catch (SQLException e) {
+                LOGGER.info("Execute pre sql failed", e);
+                if (!ignorePostSqlExceptions) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    private void executeSql(String sql) throws SQLException {
+        Connection connection = null;
+        Statement statement = null;
+
+        try {
+            connection = DriverManager.getConnection(dbUrl, username, password);
+            statement = connection.createStatement();
+
+            statement.execute(sql);
+            LOGGER.info("Executed sql successfully.");
+        } finally {
+            if (statement != null) {
+                statement.close();
+            }
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+    }

Review Comment:
   Please use try-with-resource to close the resource.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org